How to stop an Actor net waiting for all of them to be idle [Akka 2]

246 views
Skip to first unread message

Pepe

unread,
Mar 31, 2012, 9:42:06 AM3/31/12
to Akka User List
Premises:
Web of interconnected actors that interchange messages.
The system is designed in a way that the actors by themselves don't
generate enough message traffic to keep them busy forever.
They receive messages from external sources that keep them
continuously busy.
Once the external input stops feeding messages the actors keep
exchanging messages for a while until they finish processing all of
them.
It is now, when all the actors are idle, that they can be feeded with
a PoisonPill or instead simply stopped.

is there any way of knowing if the system has arrived to this state?

I have arrived to this but it is too ugly!
boolean isSystemFrozen() {
Dispatcher dispatcher = (Dispatcher)
system.dispatchers().lookup("dispatcher-id");
ExecutorService executor =
dispatcher.executorService().get().executor();
if(executor instanceof ThreadPoolExecutor) {
return ((ThreadPoolExecutor) executor).getActiveCount() == 0;
} if(executor instanceof ForkJoinPool) {
return ((ForkJoinPool) executor).isQuiescent();
} else {
??????
}
}

Cheers

Viktor Klang

unread,
Apr 1, 2012, 4:44:47 PM4/1/12
to akka...@googlegroups.com
Don't try the above.

Have the Actor that knows when external input is done send a PoisonPill to all workers using an ActorSelection.

Cheers,


Cheers

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.


Pepe

unread,
Apr 1, 2012, 7:32:39 PM4/1/12
to Akka User List
But after the external input has ceased the actors can still send
messages between them.
Sending a PoisonPill to all the actors at that moment could imply a
few dead letters.
Ex:
Actor web is: "A" and "B".
External input has finished.
A PoisonPill is sent to both actors "A" and "B".
But "A" is still active processing external (or internal) messages.
As "A" is still running it could send a message to "B" but, as "B" has
already received a PoisonPill, it will never process that message.

Am I right ?

Maybe a compromise solution would be just wait a few millis before
sending the PoisonPill allowing the system to "cool down".
I guess a few seconds should be enough.
Not an accurate solution but maybe the best that can be achieved with
the current Akka API avoiding monsters like my previous code.

Cheers

Viktor Klang

unread,
Apr 2, 2012, 4:20:24 AM4/2/12
to akka...@googlegroups.com
On Mon, Apr 2, 2012 at 1:32 AM, Pepe <ppr...@googlemail.com> wrote:
But after the external input has ceased the actors can still send
messages between them.
Sending a PoisonPill to all the actors at that moment could imply a
few dead letters.
Ex:
Actor web is: "A" and "B".
External input has finished.
A PoisonPill is sent to both actors "A" and "B".
But "A" is still active processing external (or internal) messages.
As "A" is still running it could send a message to "B" but, as "B" has
already received a PoisonPill, it will never process that message.

Am I right ?

Yup.
Try to switch to broadcasting a EOF message to all workers, then they can become() a new behavior that terminates when they are done with their work.

Cheers,
 

Maybe a compromise solution would be just wait a few millis before
sending the PoisonPill allowing the system to "cool down".
I guess a few seconds should be enough.
Not an accurate solution but maybe the best that can be achieved with
the current Akka API avoiding monsters like my previous code.

Cheers

Pepe

unread,
Apr 2, 2012, 6:15:47 AM4/2/12
to Akka User List
I don't think that would help much.
How can an actor know that its work is really done?
I mean, the only way an actor could be sure it is not going to receive
more incoming messages from its peers is knowing if its peers (all of
them) are idle or stopped.
Did I miss anything?

Cheers

Viktor Klang

unread,
Apr 2, 2012, 6:22:22 AM4/2/12
to akka...@googlegroups.com
On Mon, Apr 2, 2012 at 12:15 PM, Pepe <ppr...@googlemail.com> wrote:
I don't think that would help much.
How can an actor know that its work is really done?

If nothing can know when everything's done, then you can never solve your problem anyway.

Cheers,
 
I mean, the only way an actor could be sure it is not going to receive
more incoming messages from its peers is knowing if its peers (all of
them) are idle or stopped.
Did I miss anything?

Cheers

Pepe

unread,
Apr 2, 2012, 10:58:07 AM4/2/12
to Akka User List
"everything's done":
When all the actors are idle (I think it is equivalent to all
dispatcher threads are idle and its queue is empty)

I think that's a common requirement on a "simulation problem"
scenario.
There are an external incoming messages: a clock tick and possible
other external inputs.
And the system keeps evolving.
Once the clock is stopped then we wait for the system to "cool down"
before making a measure;
and before saving the current state of the system with the purpose of
restarting it later resuming it with that state.

In my case the system is not anything spectacular, no smart physical
simulations, just an MMO game.
And there is no special measure at the end, because the actors
themselves are continuously sending their state changes to a persister
queue.
The purpose is to be able to stop the system and have it saved in a
proper state.

The key here is that the actors cannot know when they aren't going to
get any new messages.
But the dispatchers (at least internally) can know when there won't be
more messages.
However they don't expose that funcionality.

The code in my first message (with a few errors) try to overcome that
lack of functionality.
I think the purpose is fulfilled but the code is, at least, fragile.
I just wanted to know if Akka offers that functionality in a way I
hadn't noticed.
Or if Akka does not offer it, maybe it could be a sensible addition if
a reasonable number of people have the same problem than me.
It seems however that I'm the only one with that requirement.

Cheers

Viktor Klang

unread,
Apr 2, 2012, 11:04:57 AM4/2/12
to akka...@googlegroups.com
On Mon, Apr 2, 2012 at 4:58 PM, Pepe <ppr...@googlemail.com> wrote:
"everything's done":
When all the actors are idle (I think it is equivalent to all
dispatcher threads are idle and its queue is empty)

Forget that, it's inherently racy. I think you need to organize your problem so that you can propagate the knowledge of EOF.
Since children will be terminated when the parent dies, you can exploit this.

Cheers,
Show me a solution and I'll consider adding it!
 
It seems however that I'm the only one with that requirement.


Cheers,

Pepe

unread,
Apr 2, 2012, 12:07:29 PM4/2/12
to Akka User List
I am a complete Scala illiterate.
I have just browsed the Akka code and I have tried to make some un-
educated guesses about how it works.

Every time a new mesage is sent:
- Dispatcher's executeTask is called
- a TaskInvocation is created with a run method that wraps the code
to run. These code will be run later.
- later in time code wrapped by TaskInvocation is run

Is this more or less close to the truth ?

Proposed solution:

if there were a field in the Dispatcher class:
AtomicInteger messagesCount = new AtomicInteger(0);

then executeTask could have as its first line:
messagesCount.incrementAndGet();

and it could be added inside the finally block in the run method in
TaskInvocation:
messagesCount.decrementAndGet();

Then a method like this in Dispatcher:
boolean isIdle() {
return messagesCount.get() == 0;
}

Does all this crap makes sense ??

Viktor Klang

unread,
Apr 2, 2012, 12:15:05 PM4/2/12
to akka...@googlegroups.com
Introduces a hotspot for cache-traffic, so would be very costly, and since you have multiple dispatchers it still wouldn't work.

Cheers,

Pepe

unread,
Apr 2, 2012, 12:30:43 PM4/2/12
to Akka User List
My Dispatchers have some kind of "ordering".
At the moment I have only two: the one for the Model and the one for
the Persister.
That means I should wait for the Model's to stop ans the wait for the
Persister's.

Is there any way I could use my own Dispatcher with Akka ?

Cheers

Viktor Klang

unread,
Apr 2, 2012, 3:58:13 PM4/2/12
to akka...@googlegroups.com
Absolutely, create your own DispatcherConfigurator and specify its FQCN in your Akka configuration:

  1. # Must be one of the following
  2. # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of
  3. # the same type), PinnedDispatcher, or a FQCN to a class inheriting
  4. # MessageDispatcherConfigurator with a constructor with
  5. # com.typesafe.config.Config parameter and akka.dispatch.DispatcherPrerequisites
  6. # parameters.
  7. # PinnedDispatcher must be used together with executor=thread-pool-executor.
  8. type = "Dispatcher"
 

Cheers,


Cheers

Pepe

unread,
Apr 3, 2012, 1:55:28 PM4/3/12
to Akka User List
LOL
the first time I saw FQCN in the docs I thought it was related to some
advanced Akka concept or to some Scala thing

Pepe

unread,
Apr 3, 2012, 1:56:19 PM4/3/12
to Akka User List
√ictor, It seems you were concerned by performance using an
AtomicInteger.
Could an Actor be a better solution?

something like:

in the Dispatcher class:
MessageCounter messagesCount;

first line in executeTask:
messagesCount.tell(MessageCounter.INC);

finally block in the run method in TaskInvocation:
messagesCount.tell(MessageCounter.DEC);

a method like this in Dispatcher:
boolean isIdle() {
return messagesCount.ask(null) == 0;
}
----------------------------------------------------
public class MessageCounter extends UntypedActor {
static public class Count {
public final int count;
public Count(int count) {
this.count = count;
}
}

static final public Count INC = new Count(1);
static final public Count DEC = new Count(-1);

private int total;

@Override
public void onReceive(Object msg) throws Exception {
if(msg instanceof Count) {
total += ((Count) msg).count;
} else if(msg == null) {
getSender().tell(total);
} else {
unhandled(msg);
}
}
}

Akka Team

unread,
Apr 3, 2012, 4:24:20 PM4/3/12
to akka...@googlegroups.com
On Tue, Apr 3, 2012 at 7:56 PM, Pepe <ppr...@googlemail.com> wrote:
√ictor, It seems you were concerned by performance using an
AtomicInteger.

In what way would that solve the issue I outlined?

Cheers,
 
               }
       }
}

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

√iktor Ҡlang

unread,
Apr 3, 2012, 4:26:30 PM4/3/12
to akka...@googlegroups.com
Lol!
 

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Pepe

unread,
Apr 3, 2012, 6:00:02 PM4/3/12
to Akka User List
Not really sure, that's what I'm asking :-D

I didn't really unterstand "Introduces a hotspot for cache-traffic, so
would be very costly"
I thought the problem was with the AtomicInteger because of the use of
locks.
That's the reason I asked if a solution using "fire & forget" actors
would be better.

By the way, I'd use a TypedActor implementation cause, as you have
noticed, I'm rubbish with the untyped ones.

√iktor Ҡlang

unread,
Apr 3, 2012, 6:11:12 PM4/3/12
to akka...@googlegroups.com
Hi Pepe,

On Wed, Apr 4, 2012 at 12:00 AM, Pepe <ppr...@googlemail.com> wrote:
Not really sure, that's what I'm asking :-D

I didn't really unterstand "Introduces a hotspot for cache-traffic, so
would be very costly"
I thought the problem was with the AtomicInteger because of the use of
locks.

AtomicInteger does not use locks. But if you look at how incrementAndGet/decrementAndGet is implemented, you see that it on HotSpot is implemented as a read+CAS-loop, and since you effectively have a single memory slot that needs to be cache-synchronized over all Threads in the dispatcher, you've effectively created a cache-traffic hotspot. What's even worse is that since an AtomicInteger isn't cache-line padded to avoid false-sharing, you risk having neighboring objects interfere as well.
 
That's the reason I asked if a solution using "fire & forget" actors
would be better.

By the way, I'd use a TypedActor implementation cause, as you have
noticed,  I'm rubbish with the untyped ones.

You need to have something that keeps track of what work has been handed out, and when it gets an EOF it waits to get all completed tasks reported in before it shuts things down.

Cheers,
 

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Pepe

unread,
Apr 4, 2012, 9:41:53 AM4/4/12
to Akka User List
Sorry, My mind had mixed ideas, I guess the 'lock' came from
ThreadPoolExecutor.
As you have noticed I'm not very good with concurrency.
That's one of the reason I chose Akka, because it promises "Simple
Concurrency".
And I don't know much about JVM ops either.

> "something that keeps track of what work has been handed out"

- AtomicInteger counter fulfils that, but doesn't perform well.
- Counter actor also fulfills it. Does it perform well?

I cannot think of any other way. (just "unabstracting" the counter
actor into a queue (blocking queue ?))
===========================================================================================
The kind of systems I'm referring to can be categorized as
"Simulations".
In an ideal world a simulation never stops, it is an "infinite game".
The requirement to stop is a technical requirement: to be able to
restart the computer etc...

Let me state one simplified example of the kind of systems I am
dealing with,
they are MMO games but I think the same apply to other kind of
simulations.

Spaceships game.
Actor types: Player, Ship, Quadrant.
-----------------------------------------------------------------------------
USR_FIRE:
an external message from a player to the Player actor.
If it is correct sends MSG_FIRE to the Players's Ship.
MSG_FIRE:
Ship fires its guns to an enemy Ship.
It sends MSG_SHOOTED to the target Ship.
MSG_SHOOTED:
Ship that receives the shoot calculates the damage received.
If it is destroyed sends: MSG_DESTROYED to the Quadrant,
MSG_VICTORY to the firing Ship
and MSG_DEFEAT to the destroyed Ship.
If only damaged informs the current Quadrant of damages with
MSG_DAMAGES
MSG_DAMAGES:
Updates quadrant state.
It will update the players in that quadrant when the next clock tick
is received.
MSG_DESTROYED:
Updates quadrant state.
It will update the players in that quadrant when the next clock tick
is received.
MSG_VICTORY(received by a Ship):
Re-sends it to the owner Player.
MSG_VICTORY(received by a Player):
Updates the player stats (victories++).
MSG_DEFEAT(received by a Ship):
Updates Ship position to home, updates Ship's hull characteristics.
Sends MSG_ENTER to the home quadrant.
re-sends it to the owner Player
MSG_DEFEAT(received by a Player):
Updates the player stats (defeats++)
MSG_ENTER:
Updates quadrant state.
It will update the players in that quadrant when the next clock tick
is received.
-----------------------------------------------------------------------------
If all the messages are fire&forget style, how can they be tracked?

√iktor Ҡlang

unread,
Apr 4, 2012, 9:47:42 AM4/4/12
to akka...@googlegroups.com
pseudo:

Have incoming work be sent to an actor that keeps track of unfinished work (A), have that actor delegate the work to a worker (Bx),
when the worker is done with the work it reports back to A, when EOF happens for incoming work, A will now shut things down when it's list of outstanding work reaches 0.

Cheers,

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

Josh Marcus

unread,
Apr 4, 2012, 10:10:43 AM4/4/12
to akka...@googlegroups.com
We use a similar sort of pattern in our project, if you want to see some code.  Each calculation delegates
sub-calculations to other workers, and keeps track of the overall picture, similar to Viktor's example:



--
Josh Marcus
Senior GIS Software Architect

Azavea |  T: 215.701.7505
www.azavea.com

Pepe

unread,
Apr 4, 2012, 2:12:52 PM4/4/12
to Akka User List
Is this really different from using a Counter actor in the
Dispatcher ?
> Typesafe <http://www.typesafe.com/> - The software stack for applications
> that scale
>
> Twitter: @viktorklang

Viktor Klang

unread,
Apr 4, 2012, 5:24:42 PM4/4/12
to akka...@googlegroups.com
On Wed, Apr 4, 2012 at 8:12 PM, Pepe <ppr...@googlemail.com> wrote:
Is this really different from using a Counter actor in the
Dispatcher ?

Because it doesn't use shared mutable state concurrency control. Now all your actors can live on different physical machines and use different dispatchers and doesn't require you to program at different layers of abstraction. By modeling the protocol and the flow of messages within the same abstraction your domain code your code becomes much easier to reason about and easier to evolve.

Cheers,

Pepe

unread,
Apr 4, 2012, 6:11:11 PM4/4/12
to Akka User List
Is that a Yes or a No !?

Anyway, I hadn't think about distributing the actors on several
machines.
Using the same actor counter on their dispatchers should solve that
problem.

√iktor Ҡlang

unread,
Apr 4, 2012, 6:18:14 PM4/4/12
to akka...@googlegroups.com
On Thu, Apr 5, 2012 at 12:11 AM, Pepe <ppr...@googlemail.com> wrote:
Is that a Yes or a No !?

I didn't outline enough reasons why it is different?
 

Anyway, I hadn't think about distributing the actors on several
machines.
Using the same actor counter on their dispatchers should solve that
problem.

I don't follow, can you elaborate?

Cheers,



--
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Pepe

unread,
Apr 4, 2012, 6:36:22 PM4/4/12
to Akka User List
>Because it doesn't use shared mutable state concurrency control.
Sorry, I don't understand that. I'm too inexpert.

>By modeling the protocol and the flow of messages within the
>same abstraction your domain code your code becomes much easier to reason
>about and easier to evolve.
At the moment my model is almost completely unaware of Akka.
It is only present in the model in:
- Loader code, at start up, where the entities are created from the DB
- Factory Methods in Repositories

>> Anyway, I hadn't think about distributing the actors on several machines.
I hadn't considered that maybe in the future several machines would be
needed.
I was thinking only on a Local System,
I was thinking that the dispatcher of the model actors is hidden
inside the dispatcher

>> Using the same actor counter on their dispatchers should solve that problem.
If several dispatchers (even on different machines) share the same
counter actor
then the problem of waiting for actors to be idle when they are
distributed on
several machines is solved.

√iktor Ҡlang

unread,
Apr 4, 2012, 6:57:17 PM4/4/12
to akka...@googlegroups.com
On Thu, Apr 5, 2012 at 12:36 AM, Pepe <ppr...@googlemail.com> wrote:
>Because it doesn't use shared mutable state concurrency control.
Sorry, I don't understand that. I'm too inexpert.

There's a lot of easily accessible material on that topic available via Google.
 

>By modeling the protocol and the flow of messages within the
>same abstraction your domain code your code becomes much easier to reason
>about and easier to evolve.
At the moment my model is almost completely unaware of Akka.
It is only present in the model in:
- Loader code, at start up, where the entities are created from the DB
- Factory Methods in Repositories

You said: "Premises:
Web of interconnected actors that interchange messages.
The system is designed in a way that the actors by themselves don't
generate enough message traffic to keep them busy forever."

So I was assuming you were using Actors quite a bit?
 

>> Anyway, I hadn't think about distributing the actors on several machines.
I hadn't considered that maybe in the future several machines would be
needed.
I was thinking only on a Local System,
I was thinking that the dispatcher of the model actors is hidden
inside the dispatcher

I strongly suggest keeping the work coordinator outside of the workers.
 

>> Using the same actor counter on their dispatchers should solve that problem.
If several dispatchers (even on different machines) share the same
counter actor
then the problem of waiting for actors to be idle when they are
distributed on
several machines is solved.

You mean your counter actor idea? How would the dispatchers know if they are to create or obtain a reference to the counter actor, and where would it live?

Cheers,
 

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

Pepe

unread,
Apr 6, 2012, 10:46:17 AM4/6/12
to Akka User List
>So I was assuming you were using Actors quite a bit?
I am, many of the model entities are actors but, as they are typed
actors, the code in the model is almost unaware of Akka.
I think that's one of the big advantages of using typed actors, they
are pretty transparent.
You could remove akka from my project and the model wouldn't need to
change.
Well, unless the model was single-threaded or other magic akka-style
was added, it should need to change to control the "shared mutable
state".
Akka is obviously present in scaffolding/startup/shutdown code.

I have had another idea!
What about an executor that counts queued tasks with an AtomicInteger?
That way the system uses the usual executor (for hours, days,
weeks...)
and only after a system shutdown is needed the executor is replaced by
a new one that can inform when it is idle.
There are two options available:
- extend a new Dispatcher as the code that comes next
- put those lines in the code that controls the shutdown procedure

public class CountDispatcher extends Dispatcher {
private CountExecutorService countExecutorService;
void prepareShutdown(AtomicInteger counter, long timeout, TimeUnit
unit) {
ExecutorServiceDelegate originalService =
dispatcher.executorService().get();
setCountExecutorService(counter);
originalService.shutdown();
originalService.awaitTermination(timeout, unit);
}
private void setCountExecutorService(AtomicInteger counter) {
ExecutorService service =
executorServiceFactory().createExecutorService();
countExecutorService = new CountExecutorService(service, counter);
ExecutorServiceDelegate delegate = new
WrappedExecutorService(countExecutorService);
executorService().set(delegate);
}
public boolean isIdle() {
return countExecutorService != null &&
countExecutorService.isIdle();
}
}
public class ModelExecutorService implements ExecutorService {
private class CountRunnable implements Runnable {
private final Runnable runnable;
public CountRunnable(Runnable runnable) {
if(runnable == null) throw new NullPointerException();
counter.incrementAndGet();
this.runnable = runnable;
}
@Override public void run() {
try {
runnable.run();
} finally {
counter.decrementAndGet();
}
}
}
private class CountCallable<V> implements Callable<V> {
...
}
private final ExecutorService service;
private final AtomicInteger counter;
public ModelExecutorService(ExecutorService service, AtomicInteger
counter) {
this.service = service;
this.counter = counter;
}

public boolean isIdle() {
return counter.get() == 0;
}
...
}

What do you think ?

Cheers

Pepe

unread,
Apr 6, 2012, 11:13:57 AM4/6/12
to Akka User List
Oops, wrong name

public class CountExecutorService implements ExecutorService {
private class CountRunnable implements Runnable {
private final Runnable runnable;
public CountRunnable(Runnable runnable) {
if(runnable == null) throw new
NullPointerException();
counter.incrementAndGet();
this.runnable = runnable;
}
@Override public void run() {
try {
runnable.run();
} finally {
counter.decrementAndGet();
}
}
}
private class CountCallable<V> implements Callable<V> {
...
}
private final ExecutorService service;
private final AtomicInteger counter;
public CountExecutorService(ExecutorService service,
AtomicInteger counter) {
this.service = service;
this.counter = counter;
}
public boolean isIdle() {
return counter.get() == 0;
}
@Override
public void execute(Runnable command) {
service.execute(new CountRunnable(command));
}
...

}

Akka Team

unread,
Apr 6, 2012, 5:24:19 PM4/6/12
to akka...@googlegroups.com
On Fri, Apr 6, 2012 at 4:46 PM, Pepe <ppr...@googlemail.com> wrote:
>So I was assuming you were using Actors quite a bit?
I am, many of the model entities are actors but, as they are typed
actors, the code in the model is almost unaware of Akka.
I think that's one of the big advantages of using typed actors, they
are pretty transparent.

TypedActors are intended to be a bridge-layer between non-actor code and Actors, not to be used exclusively.
 
You could remove akka from my project and the model wouldn't need to
change.

Yes, but unless you're using only void-returning methods or Future-returning methods on your typed actors you're wasting threads.
 
Well, unless the model was single-threaded or other magic akka-style
was added, it should need to change to control the "shared mutable
state".

I don't follow here.
I still think you should model the work-managed in your domain as it performs vital coordination.
I see not difference in your proposed solution as your initial proposed solution, you've even introduced allocation overhead for all actions, an extra level of indirection which will impact cache performance and still retained the cache-hotspot which is the counter.

Sorry for sounding critical,

Cheers,

 
Cheers


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Pepe

unread,
Apr 6, 2012, 6:44:45 PM4/6/12
to Akka User List
>TypedActors are intended to be a bridge-layer between non-actor code and
>Actors, not to be used exclusively.
????????????
Not all my entities are actors:
-a Ship is an actor but,
-Hull, Gun, Engine, etc... aren't
Is that related with your comment???

>unless you're using only void-returning methods or
>Future-returning methods on your typed actors you're wasting threads.
Oh yes, I forgot to state that.
I only use fire&forget methods.
But the main reason was to avoid deadlocks.
Have "wasting threads" a special meaning in concurrency jargon?

>I see not difference in your proposed solution as your initial proposed
>solution, you've even introduced allocation overhead for all actions, an
>extra level of indirection which will impact cache performance and still
>retained the cache-hotspot which is the counter
But now the "extra" is only present after the shutdown has been
required.
In the initial solution the "extra" was present all the time, from
start to shutdown.
If the system is running for a couple of days
and it requires a few additional seconds for a proper shutdown
then it is no big deal.

>Sorry for sounding critical
If the critic is sound I really appreciate it :)

Cheers

Akka Team

unread,
Apr 6, 2012, 7:10:22 PM4/6/12
to akka...@googlegroups.com
On Sat, Apr 7, 2012 at 12:44 AM, Pepe <ppr...@googlemail.com> wrote:
>TypedActors are intended to be a bridge-layer between non-actor code and
>Actors, not to be used exclusively.
????????????
Not all my entities are actors:
       -a Ship is an actor but,
       -Hull, Gun, Engine, etc... aren't
Is that related with your comment???

It was merely a word of caution :-)
 

>unless you're using only void-returning methods or
>Future-returning methods on your typed actors you're wasting threads.
Oh yes, I forgot to state that.
I only use fire&forget methods.
But the main reason was to avoid deadlocks.
Have "wasting threads" a special meaning in concurrency jargon?

Since you're using only fire-and-forget (void methods) then you're fine, but in the case of returning non-void-non-future results you're blocking the calling thread until a timeout occurs, while another thread is doing the work, hence 1 thread blocked and one thread working, instead of having only one thread, which does the work itself.
 

>I see not difference in your proposed solution as your initial proposed
>solution, you've even introduced allocation overhead for all actions, an
>extra level of indirection which will impact cache performance and still
>retained the cache-hotspot which is the counter
But now the "extra" is only present after the shutdown has been
required.
In the initial solution the "extra" was present all the time, from
start to shutdown.
If the system is running for a couple of days
and it requires a few additional seconds for a proper shutdown
then it is no big deal.

You might have a race there though, long running tasks that started prior to your shutdown won't be visible to your shutdown.
You also might have all sorts of races happening in that code, I'm just too tired and on holiday right now :-)
 

>Sorry for sounding critical
If the critic is sound I really appreciate it :)

Great!

Cheers,
 

Cheers

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

Pepe

unread,
Apr 6, 2012, 7:51:35 PM4/6/12
to Akka User List
>TypedActors are intended to be a bridge-layer between non-actor code and
>Actors, not to be used exclusively.
I know it is not properly said but in some way I see actors like EJBs.
They are coarse-grained.
I don't know how to express that, I see it like a metaphor.

>long running tasks that started prior
>to your shutdown won't be visible to your shutdown
It doesn't matter because of these lines:
originalService.shutdown();
originalService.awaitTermination(timeout, unit);

>You also might have all sorts of races happening in that code
>I'm just too tired and on holiday right now :-)
I don't see any flaw. (Yet)
I cannot wait for the holidays to end to get your next critic ;)

Cheers

Viktor Klang

unread,
Apr 7, 2012, 5:23:37 PM4/7/12
to akka...@googlegroups.com
On Sat, Apr 7, 2012 at 1:51 AM, Pepe <ppr...@googlemail.com> wrote:
>TypedActors are intended to be a bridge-layer between non-actor code and
>Actors, not to be used exclusively.
I know it is not properly said but in some way I see actors like EJBs.
They are coarse-grained.
I don't know how to express that, I see it like a metaphor.

>long running tasks that started prior
>to your shutdown won't be visible to your shutdown
It doesn't matter because of these lines:
               originalService.shutdown();

"Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down." - http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#shutdown()

 
               originalService.awaitTermination(timeout, unit);

Blocking is evil :-)
 

>You also might have all sorts of races happening in that code
>I'm just too tired and on holiday right now :-)
I don't see any flaw. (Yet)

How about now? :-)
 
I cannot wait for the holidays to end to get your next critic ;)

Viktor Klang

unread,
Apr 7, 2012, 5:31:41 PM4/7/12
to akka...@googlegroups.com
On Sat, Apr 7, 2012 at 11:23 PM, Viktor Klang <viktor...@gmail.com> wrote:


On Sat, Apr 7, 2012 at 1:51 AM, Pepe <ppr...@googlemail.com> wrote:
>TypedActors are intended to be a bridge-layer between non-actor code and
>Actors, not to be used exclusively.
I know it is not properly said but in some way I see actors like EJBs.
They are coarse-grained.
I don't know how to express that, I see it like a metaphor.

>long running tasks that started prior
>to your shutdown won't be visible to your shutdown
It doesn't matter because of these lines:
               originalService.shutdown();

"Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down." - http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#shutdown()


A bit too fast today I see, so your plan is to switch out the "current" ExecutorService and replace it with a new one (then you should use getAndSet), then shutdown the old one and wait for it to terminate? (why wait for it to terminate btw?)
 
 
               originalService.awaitTermination(timeout, unit);

Blocking is evil :-)
 

>You also might have all sorts of races happening in that code
>I'm just too tired and on holiday right now :-)
I don't see any flaw. (Yet)

How about now? :-)
 
I cannot wait for the holidays to end to get your next critic ;)

"       private CountExecutorService countExecutorService;" <-- Not threadsafe :-)

Cheers,

Pepe

unread,
Apr 8, 2012, 2:47:51 PM4/8/12
to Akka User List
Thanks Viktor for your valuable and indispensable help.
I couldn't express my intention in a better way (so concise and
precise):
switch out the "current" ExecutorService and replace it with a new one

I think that the code is now in a very good shape (getAndSet is really
nice).
The only thing I miss is the option to use it in a distributed system,
I guess using a counter actor instead of the AtomicInteger would
suffice.

I will post in the next messages an update of my code right now:
- IdleDispatcherUtil: a utility class to wait for a dispatcher to be
idle
- CountExecutorService.CountRunnable and
CountExecutorService.CountCallable: inner classes to wrap the tasks
- CountCallable: an ExecutorService that
* counts pending+running tasks with an AtomicInteger
* can replace the ExecutorService of a given Dispatcher with itself
* has a method that "waits til idle"
My actual code has extra options that I won't send because it will
clutter the forum,
like slight awaiIdle variations (ex: awaitIdle(Dispatcher dispatcher,
long nanos))
or the option wait for several local dispatchers
(awaitIdle(AtomicInteger counter, long timeout, TimeUnit unit,
Dispatcher...dispatchers))

Can anyone spot any mistakes ??
Any improvements will be really welcomed !!!!

Pepe

unread,
Apr 8, 2012, 2:48:27 PM4/8/12
to Akka User List
public class IdleDispatcherUtil {
static public boolean awaitIdle(Dispatcher dispatcher, long timeout,
TimeUnit unit) {
return awaitIdle(dispatcher, new AtomicInteger(0), timeout, unit);
}
static public boolean awaitIdle(Dispatcher dispatcher, AtomicInteger
counter, long timeout, TimeUnit unit) {
if(timeout <= 0) return false;
CountExecutorService countExecutorService = new
CountExecutorService(dispatcher, counter);
ExecutorServiceDelegate originalService =
countExecutorService.replace(dispatcher);
originalService.shutdown();
if(!originalService.awaitTermination(timeout, unit)) return false;
if(!countExecutorService.awaitIdle(timeout, unit)) return false;
return true;
}
}

Pepe

unread,
Apr 8, 2012, 2:49:14 PM4/8/12
to Akka User List
public class CountExecutorService implements ExecutorService {
private class CountRunnable implements Runnable {
private final Runnable runnable;
public CountRunnable(Runnable runnable) {
if(runnable == null) throw new NullPointerException();
counter.incrementAndGet();
this.runnable = runnable;
}
@Override public void run() {
try {
runnable.run();
} finally {
decCounter();

Pepe

unread,
Apr 8, 2012, 2:49:46 PM4/8/12
to Akka User List
public class CountExecutorService implements ExecutorService {
private final ExecutorService service;
private final Lock lock = new ReentrantLock();
private final Condition idle = lock.newCondition();
private final AtomicInteger counter;

public CountExecutorService(Dispatcher dispatcher, AtomicInteger
counter) {
this(dispatcher.executorServiceFactory().createExecutorService(),
counter);
}
public CountExecutorService(ExecutorService service, AtomicInteger
counter) {
this.service = service;
this.counter = counter;
}
private void decCounter() {
if(counter.decrementAndGet() == 0) {
idle.signalAll();
}
}
public ExecutorServiceDelegate replace(Dispatcher dispatcher) {
ExecutorServiceDelegate delegate = new WrappedExecutorService(this);
return dispatcher.executorService().getAndSet(delegate);
}
public boolean awaitIdle(long timeout, TimeUnit unit) {
lock.lock();
try {
while(isBusy())
if(!idle.await(timeout, unit)) return false;
return true;
} catch(InterruptedException e) {
return false;
} finally {
lock.unlock();
}
}
public boolean isBusy() {
return counter.get() > 0;

Derek Williams

unread,
Apr 8, 2012, 4:16:08 PM4/8/12
to akka...@googlegroups.com
I'm coming into this discussion a bit late, and I skipped over some of it so I may have missed an important point, but here is how I handle similar situations:

If I just want the actor to deal with the messages that are currently in the mailbox, I have the actor send itself a message to shut itself down (this could be a PoisonPill, or something else). If you don't use a PoisonPill for this, you can have the actor cancel it's pending shutdown if an important message is received that it has to handle properly.

If you want a stronger guarantee that the mailbox is empty, you can have the actor send itself the shutdown message, and keep track of the number of messages seen until it is received (this is also a decent way of keeping stats on mailbox size). When the shutdown message is received and there was no other messages received after it was originally sent, then stop the actor. If there were other messages received, reset the count and send the shutdown message again. Keep doing this until the shutdown message is received right after it was sent, meaning that, at least for some point in time, the mailbox was empty (which is about as much assurance as you are going to get).

The advantage of using a pattern like this is you do not have the overhead of keeping track of pending messages during normal operations.

--
Derek Williams

Derek Williams

unread,
Apr 8, 2012, 4:17:57 PM4/8/12
to akka...@googlegroups.com
Just to add to my previous post, this doesn't directly solve the problem of shutting down a large system of actors, but it does let you broadcast to all your actors the intention of shutting down, and that they should be working towards that goal.

--
Derek Williams

Viktor Klang

unread,
Apr 9, 2012, 7:05:34 AM4/9/12
to akka...@googlegroups.com
Problem is that you're trying to build it in on the dispatch-level instead of in the domain level,
if you had implemented as I suggested with a WorkManager it'd work OOTB in a distributed setting.

Cheers,


       }
}

Pepe

unread,
Apr 9, 2012, 11:21:15 AM4/9/12
to Akka User List
> Problem is that you're trying to build it in on the dispatch-level instead
> of in the domain level,
As I see it, this concern doesn't belong to the domain layer, it
belongs to the infrastructure layer.
I don't want to pollute the domain model with alien concepts.

> if you had implemented as I suggested with a WorkManager it'd work OOTB in
> a distributed setting.
If I had implemented it with a counter actor it'd also work OOTB in a
distribute setting.
But, as I don't need distributed systems (yet) I chose the
AtomicInteger easier path.

In an infinite simulation it is very difficult to get the concept of
work done (is it meaningful?).
Besides, the actors are (can be) highly interconnected. At any moment:
- a player can receive an invitation, a war declaration, a purchase,
etc. from almost any other player,
- a flying ship can receive a shot from another ship or base
Then, I don't know how to construct the WorkManager.
*One way could be using request-reply style methods to control easily
the "end" of a partial work.
But I don't want to use request-reply methods because I don't want to
worry about deadlocks.
*Another way could involve having a more strict control of all the
messages. But how can that be achieved?
In order to create an actor there is an interface (ex:Player) and an
implementing class (ex:PlayerImpl) with the actual business code.
Should a special WatchedActor class be used and make all the *Impl
inherit it?
Should a special WatchedActor class be constructed as a wrapper for
the domain classes?

Cheers

Viktor Klang

unread,
Apr 9, 2012, 12:10:23 PM4/9/12
to akka...@googlegroups.com
On Mon, Apr 9, 2012 at 5:21 PM, Pepe <ppr...@googlemail.com> wrote:
> Problem is that you're trying to build it in on the dispatch-level instead
> of in the domain level,
As I see it, this concern doesn't belong to the domain layer, it
belongs to the infrastructure layer.
I don't want to pollute the domain model with alien concepts.

> if you had implemented as I suggested with a WorkManager it'd work OOTB in
> a distributed setting.
If I had implemented it with a counter actor it'd also work OOTB in a
distribute setting.
But, as I don't need distributed systems (yet) I chose the
AtomicInteger easier path.

In an infinite simulation it is very difficult to get the concept of
work done (is it meaningful?).

So hy do you need the notion of an end then?

Cheers,
 
Besides, the actors are (can be) highly interconnected. At any moment:
- a player can receive an invitation, a war declaration, a purchase,
etc. from almost any other player,
- a flying ship can receive a shot from another ship or base
Then, I don't know how to construct the WorkManager.
*One way could be using request-reply style methods to control easily
the "end" of a partial work.
But I don't want to use request-reply methods because I don't want to
worry about deadlocks.
*Another way could involve having a more strict control of all the
messages. But how can that be achieved?
In order to create an actor there is an interface (ex:Player) and an
implementing class (ex:PlayerImpl) with the actual business code.
Should a special WatchedActor class be used and make all the *Impl
inherit it?
Should a special WatchedActor class be constructed as a wrapper for
the domain classes?

Cheers

Pepe

unread,
Apr 9, 2012, 12:39:29 PM4/9/12
to Akka User List
What I need is to be able to restart the system.
Reply all
Reply to author
Forward
0 new messages