Dynamically created request-reply processors/channels

48 views
Skip to first unread message

Waldek

unread,
Aug 5, 2013, 4:28:51 PM8/5/13
to events...@googlegroups.com
Hi All,

I have been recently prototyping an app that is supposed to act as a job controller - listen for incoming job requests, spawn news jobs in external system, track them and notify other external subsystems about jobs progress.

All requests go through central dispatcher EventsourcedDispatcher which is an evensourced processor. The EventsourcedDispatcher journals all requests and dispatches requests to the EventsourcedJob1OrJob2Dispatcher actor that would create new/lookup existing and forward the message to EventsourcedJobCoordinator. The EventsourcedJobCoordinator is a FSM actor and a processor and would initially spawn new job by forwarding the message to JobInitiator (plain actor) through ReliableRequestReplyChannel. JobInitator would make RPC/HTTP call to external system. The reason behind using the ReliableRequestReplyChannel is to journal the job guids.

Basic idea is to recreate entire tree of actors on recover() along with the information what jobs were spawned and what their guid is so that app knows what jobs have been in progress and what state and and what other subsystems have been notified about job progress.

Anyway the app works as intended in regular run (journal empty) as the log snippet shows below:

013-08-05 16:12:40,744 [treatments-akka.actor.default-dispatcher-4] DEBUG EventsourcedDispatcher$$anonfun$16$$anon$5 - EventsourcedJob1OrJob2Dispatcher: Replayed coordinator
2013-08-05 16:12:40,744 [treatments-akka.actor.default-dispatcher-4] DEBUG EventsourcedDispatcher$$anonfun$16$$anon$5 - EventsourcedJob1OrJob2Dispatcher: Forwarded to coordinator
2013-08-05 16:12:40,772 [treatments-akka.actor.default-dispatcher-6] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Before delivering to jobInitiatorChannel with sequenceNr 2
2013-08-05 16:12:40,782 [treatments-akka.actor.default-dispatcher-9] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: After delivering to jobInitiatorChannel
2013-08-05 16:12:40,784 [treatments-akka.actor.default-dispatcher-9] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Before sending message to jobInitiatorChannel to initiateJob: job11
2013-08-05 16:12:40,785 [treatments-akka.actor.default-dispatcher-9] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Sent message to jobInitiatorChannel to initiateJob: job11
2013-08-05 16:12:40,806 [treatments-akka.actor.default-dispatcher-8] DEBUG a.s.Serialization(akka://treatments) - Using serializer[akka.serialization.JavaSerializer] for message [InitiateJob]
2013-08-05 16:12:40,822 [treatments-akka.actor.default-dispatcher-9] DEBUG JobInitiator - JobInitiator: Initiated job: job11 with guid: 4e4af26b-494b-46e5-8628-5429072c8f86
2013-08-05 16:12:40,825 [treatments-akka.actor.default-dispatcher-6] DEBUG a.s.Serialization(akka://treatments) - Using serializer[akka.serialization.JavaSerializer] for message [JobInitiated]
2013-08-05 16:12:40,833 [treatments-akka.actor.default-dispatcher-8] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Received message from jobInitiatorChannel that job with guid: 4e4af26b-494b-46e5-8628-5429072c8f86 was initiated
2013-08-05 16:12:40,836 [treatments-akka.actor.default-dispatcher-7] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Created job tracker

However it misbehaves during recovery on restart (last line):

2013-08-05 16:14:29,339 [main] DEBUG org.apache.hadoop.fs.FileSystem - Creating filesystem for file:///
2013-08-05 16:14:29,579 [treatments-akka.actor.default-dispatcher-5] DEBUG EventsourcedSystem$$anonfun$20$$anon$3 - EventsourcedDispatcher: Received: FileNotification(1,FFW)
2013-08-05 16:14:29,579 [treatments-akka.actor.default-dispatcher-5] DEBUG EventsourcedDispatcher$$anonfun$16$$anon$5 - EventsourcedJob1OrJob2Dispatcher: Received: FileNotification(1,job1) with sequenceNr 1
2013-08-05 16:14:29,608 [treatments-akka.actor.default-dispatcher-5] DEBUG EventsourcedDispatcher$$anonfun$16$$anon$5 - EventsourcedJob1OrJob2Dispatcher: Replayed coordinator
2013-08-05 16:14:29,609 [treatments-akka.actor.default-dispatcher-5] DEBUG EventsourcedDispatcher$$anonfun$16$$anon$5 - EventsourcedJob1OrJob2Dispatcher: Forwarded to coordinator
2013-08-05 16:14:29,613 [treatments-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://treatments) - Using serializer[akka.serialization.JavaSerializer] for message [FileNotification]
2013-08-05 16:14:29,630 [treatments-akka.actor.default-dispatcher-6] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Before delivering to jobIniti
atorChannel with sequenceNr 2
2013-08-05 16:14:29,635 [treatments-akka.actor.default-dispatcher-3] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: After delivering to jobInitia
torChannel
2013-08-05 16:14:29,636 [treatments-akka.actor.default-dispatcher-3] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Before sending message to job
InitiatorChannel to initiateJob: job11
2013-08-05 16:14:29,636 [treatments-akka.actor.default-dispatcher-3] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Sent message to jobInitiatorC
hannel to initiateJob: job11
2013-08-05 16:14:29,638 [treatments-akka.actor.default-dispatcher-6] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Received message from jobInit
iatorChannel that job with guid: 4e4af26b-494b-46e5-8628-5429072c8f86 was initiated
2013-08-05 16:14:29,638 [treatments-akka.actor.default-dispatcher-6] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: Created job tracker
2013-08-05 16:14:29,671 [treatments-akka.actor.default-dispatcher-5] DEBUG EventsourcedJob1OrJob2Dispatcher$$anonfun$receive$3$$anonfun$14$$anon$1 - EventsourcedJobCoordinator: unhandled FileNotification(1,
job11) with sequence 5

As I understand the message containing event FileNotification(1,FFW) gets journaled twice (by processor 1 and 4) and probably get replayed twice which is NOT what I want. I tried to forward the event as NonToBeJournaledMessage (see commented part in EventsourcedJobCoordinator) but this leads to another unwanted behavior - JobInitiated event gets replayed before FileNotification event.

What am I doing wrong? How do I use eventsourced to make the replay of messages work as I want? Is it because I create the channels and processors dynamically?

Thanks for the help is advance,
Waldek

PS. You can see full source under this gist - https://gist.github.com/anonymous/9bf06b2dc45d12c117fe.

Martin Krasser

unread,
Aug 6, 2013, 9:09:15 AM8/6/13
to events...@googlegroups.com
Hi Waldek,

I'll look into it when I'm back from vacation (mid of Aug).

Cheers,
Martin
--
You received this message because you are subscribed to the Google Groups "Eventsourced User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to eventsourced...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

-- 
Martin Krasser

blog:    http://krasserm.blogspot.com
code:    http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

Martin Krasser

unread,
Aug 19, 2013, 3:33:24 AM8/19/13
to events...@googlegroups.com
Hi Waldek,

the reason why you observe an 'Unhandled FileNotification' by the EventsourcedJobCoordinator during recovery is:

- both EventsourcedDispatcher and EventsourcedJobCoordinator are processors that journal FileNotification events
- during recovery, each processor will receive the FileNotification event it journaled. So far, so good.
- the problem is that the EventsourcedDispatcher again forwards the replayed FileNotification to the EventsourcedJobCoordinator. Hence the EventsourcedJobCoordinator receives two FileNotification events (one during its own recovery and the other one that is re-send by the EventsourcedDispatcher).

To fix that problem make sure that the communication between EventsourcedDispatcher and child actors go through a channel (DefaultChannel is sufficient). You may want to refer to this and that discussion thread for related issues and solutions. On the other hand, why did you choose to make EventsourcedDispatcher a processor at all? It doesn't maintain state. Making it a plain actor should fix the issue as well.

As an unrelated note, you could further simplify your application by creating GUIDs, for example, in a web controller and submit it along with messages to be journaled. This would make the JobInitiator and request-reply communication with it obsolete. In general, I recommend to use reliable channels only for remote communications, for local interactions (as done with the JobInitiator), a DefaultChannel is sufficient.


Cheers,
Martin

On 05.08.13 22:28, Waldek wrote:
--
You received this message because you are subscribed to the Google Groups "Eventsourced User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to eventsourced...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 
Reply all
Reply to author
Forward
0 new messages