system.spawn('actor')
and anon_send('actor', 'msg')
will invoke policy_.central_enqueue()
. For example, the following code snippets are the detailed implementation for work_stealing policy(from work_stealing.hpp). template <class Coordinator>
void central_enqueue(Coordinator* self, resumable* job) {
auto w = self->worker_by_id(d(self).next_worker++ % self->num_workers());
w->external_enqueue(job);
}
I doubt that the implementation above would cause thread-unsafe problems when we use system.spawn()
and anon_send()
for the same stateful actor because a stateful actor would be enqueued into two different worker queues and later the workers may execute this stateful actor simultaneously.
However, CAF worked well when I tried to run a thread-unsafe program, please see below for more details(my example code).
It really confuses me and I don't know how CAF keeps thread-safe for this case, any pointer is appreciated. Thanks in advance.
//! My example code
#include <iostream>
#include <string>
#include <cassert>
#include "caf/all.hpp"
using std::cout;
using std::endl;
using std::string;
using namespace caf;
struct cell_state {
int value = 0;
};
behavior unchecked_cell(stateful_actor<cell_state>* self) {
return {
[=](put_atom, int val) {
self->state.value = val;
int i = 0;
while (i++ < 10000000) {}
// the assertion will be failed if CAF is thread-unsafe
assert(self->state.value == val);
},
[=](get_atom) {
aout(self) << "Get state value: " << self->state.value << std::endl;
return self->state.value;
}
};
}
void caf_main(actor_system& system) {
auto cell1 = system.spawn(unchecked_cell);
for (int i = 0; i < 1000; ++i) {
anon_send(cell1, put_atom::value, i);
}
anon_send(cell1, get_atom::value);
}
In the CAF source code, I noticed that bothsystem.spawn('actor')
andanon_send('actor', 'msg')
will invokepolicy_.central_enqueue()
. For example, the following code snippets are the detailed implementation for work_stealing policy(from work_stealing.hpp).template <class Coordinator> void central_enqueue(Coordinator* self, resumable* job) { auto w = self->worker_by_id(d(self).next_worker++ % self->num_workers()); w->external_enqueue(job); }
I doubt that the implementation above would cause thread-unsafe problems when we use
system.spawn()
andanon_send()
for the same stateful actor because a stateful actor would be enqueued into two different worker queues and later the workers may execute this stateful actor simultaneously.
Actors never get enqueued to worker queues twice. Actors in CAF are essentially lightweight state machines. Only when an actor (atomically) transitions from waiting for messages to ready it gets scheduled for execution. When it's already ready and it receives a message than nothing happens (other than putting the message into the mailbox of course).If you're interested in diving deeper into the implementation details, I can recommend scheduled_actor::enqueue as a starting point.
Yes! That’s pretty much “the point” of actors. 🙂