Looking for some feedback on an Event Dispatcher

959 views
Skip to first unread message

Phillip Cleveland

unread,
Feb 9, 2013, 1:10:00 AM2/9/13
to event...@googlegroups.com
This dispatcher uses the ClientAPI to receive messages and then publishes to an event bus.

https://gist.github.com/pdoh00/4744120

I welcome any feedback.

Thanks
Phil

Alexey Raga

unread,
Feb 9, 2013, 3:04:14 AM2/9/13
to event...@googlegroups.com
The obvious weakness here is that it is only able to dispatch from the "live stream" of events.
I think the most common and useful scenario would be to "catch up" with the events from the last checkpoint and then switch to the subscription channel as Greg suggested earlier.

Regards,
Alexey.

Phillip Cleveland

unread,
Feb 9, 2013, 10:39:52 AM2/9/13
to event...@googlegroups.com
I don't quite understand. Is that because the dispatcher may be down when events are still populating the event store? Basically missing events.  Or is the live stream volatile?

Thanks
Phil

Sent from my iPad
--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Greg Young

unread,
Feb 9, 2013, 12:01:39 PM2/9/13
to event...@googlegroups.com
This looks good however it is missing a bit of code.

Right now it will subscribe and give messages but what happens if I
lose my connection for some period? We can imagine a network gnome
unplug the cable. It will only receive live subscriptions and is
missing the bitof code to ensure that everything will see a message.p
before b

To do this you need to make a differentiation between live messages
and historical messages. I have put it up before but basically the
solution is:

Subscribe -> queue
Asynchronously start reading my history
In each event check if its my tail of queue
if reached tail process queue and move to live

Greg
--
Le doute n'est pas une condition agréable, mais la certitude est absurde.

Phillip Cleveland

unread,
Feb 9, 2013, 12:36:10 PM2/9/13
to event...@googlegroups.com
Cool. Thanks for the feedback. I'll update the code and repost.
Thanks
Phil

Sent from my iPad

Andrii Nakryiko

unread,
Feb 9, 2013, 4:50:44 PM2/9/13
to event...@googlegroups.com
Hi Phillip,

Szymon Pobiega has a nice blog post exactly about persistent subscription. See also my comment to that post. The mechanics he describes is almost the same as Greg suggests, with a slight difference that Szymon does reading of history events until nothing is left, then subscribes and buffers events, then does some more reading of history and only then switches to online events processing from live subscription. The way Greg proposes is prone to unnecessary memory usage, especially if history is long and there is a lot of writing to the stream you are subscribing to.

But the very first thing that caught my attention: you use not thread-safe Queue from multiple threads. That's very bad. I'd suggest using ConcurrentQueue, which is perfect for you. Also, there is no need in creating separate Thread just to dispatch events, you could use ThreadPool, while ensuring that dispatching is strictly one-threaded and lock-free. See code in https://github.com/EventStore/EventStore/blob/master/src/EventStore/EventStore.Core/Bus/QueuedHandlerThreadPool.cs for how we can do that. Pay attention to two methods: ReadFromQueue and Publish and their usage of _isRunning and proceed variables.





Best regards,
Andrii Nakryiko


2013/2/9 Phillip Cleveland <pdo...@gmail.com>

Phillip Cleveland

unread,
Feb 9, 2013, 4:55:54 PM2/9/13
to event...@googlegroups.com
Thanks Anrii for the info. Very helpful. I will have a look at the blog post and the code.

Thanks
Phil

Sent from my iPad

Greg Young

unread,
Feb 10, 2013, 4:30:20 AM2/10/13
to event...@googlegroups.com
re memory. with queue you can say exactly how much you are willing to read.

queue.Enqueue(message);
if(queue.Length == max) queue.Dequeue();

Phillip Cleveland

unread,
Feb 15, 2013, 12:18:46 AM2/15/13
to event...@googlegroups.com
New gist posted at https://gist.github.com/pdoh00/4744120. Andrii thanks for all the pointers.  I have tried to take them into account with this latest revision.

Few issues still:
1) In ReadHistoricalEvents() I am double reading some events.  I think it has something to do with me incorrectly using the slice.NextPosition incorrectly.  Any feedback would be appreciated. Line 122

2)  Currently it only reads history from the StreamPosition.Start.  I think some kind of mechanism to write the last processed event to non-volatile memory could allow for only a partial rebuild of the read store.  Right now if the dispatcher crashed I think to bring it back up I would either have to rebuild the read store or make all my event handlers idempotent.  I think I would prefer to make sure events are fired only once as opposed to hoping all the event handlers get implemented correctly.  With that said rebuilding the entire read store may be so fast that I really don't care.

3) I think I have the _liveProcessingGate.Wait in StopDispatching() wrong because it ends up throwing the TimeoutException. Line 96

Thanks
Phil

Greg Young

unread,
Feb 15, 2013, 3:18:18 AM2/15/13
to event...@googlegroups.com
re: double reading.

This is because when you send in you are saying "give me the next n
events from this point forward". This is how it is for two reasons.

1) think about what happens with the first event in the system.
2) the system can be scavenged during your walk of all the events. The
logical position that you send may not exist anymore. The backend will
handle this case and properly get you the next one.

Greg

James Nugent

unread,
Feb 15, 2013, 8:21:01 AM2/15/13
to event...@googlegroups.com
Phillip,

If you read from (0,0), you'll get n events from that point (but no event will ever be at 0,0).

If you read from, say, (143, 0), you'll get n events from that point (including 143, 0 if there is an event there).

So if you read from your last processed position, you need to 'ignore' the event IF it's at the same position as your last seen. However, it doesn't always follow that you will always get a duplicate (scavenging makes this interesting!), so you do have to check.

Here is my current version of this (which doesn't deal with reconnections etc, so it's not really that durable at the moment hence the class name) - https://gist.github.com/jen20/ec6cd45bac755979e16a - it's pretty similar in many respects.

Cheers,


James

Greg Young

unread,
Feb 15, 2013, 8:22:26 AM2/15/13
to event...@googlegroups.com
Is a durable reader something we should add to the client API?

You give us an ICheckpoint?

Phil

unread,
Feb 15, 2013, 6:57:40 PM2/15/13
to event...@googlegroups.com
Ok...the event dispatcher is finally working.  It's posted at 


Thanks to Andrii Nakryiko, James Nugent and greg Young for all the input. Some of the stuff would have taken me much longer to solve on my own.  Also thanks to Szymon Pobiega for the algorithm steps for history catch up logic.  
  I have only done some testing so far and will continue to test/enhance/post.  There are still some \\TODO: in the code especially concerning dropped connection.  It does what it is supposed to in the Happy Path scenario though.

Thanks
Phil
Phil

Alexey Raga

unread,
Feb 16, 2013, 8:15:25 AM2/16/13
to event...@googlegroups.com
Absolutely. It would save everyone from implementing it again and again.

Cheers,
Alexey.

Phil

unread,
Feb 16, 2013, 10:44:19 AM2/16/13
to event...@googlegroups.com
I think I am being a bit dense here, but I am not sure I understand the ICheckpoint in the context of the durable reader.  Is the ICheckpoint the object that would be receiving the events?
Phil

Greg Young

unread,
Feb 16, 2013, 10:54:45 AM2/16/13
to event...@googlegroups.com
Checkpoint is to save where you have received to (so in case of power outage we can bring you forward from that point)
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 

Phil

unread,
Feb 16, 2013, 11:08:13 AM2/16/13
to event...@googlegroups.com
So the pipeline is

GetEventStore saves event => 
GetEventStore Durable Reader publishes to my implementation of some IBus (or the like) => 
I track the event Position(check point) as I receive them in my bus(somewhere non volatile) => 
OnCrash() I give the check point to the GetEventStore Durable Reader which I previously stored =>
GetEventStore Durable Reader starts publishing from check point Position

Is that about right?


--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Phil

Andrii Nakryiko

unread,
Feb 16, 2013, 5:34:36 PM2/16/13
to event...@googlegroups.com
Hi Phillip,

Sorry for late answer. Here is my reworked variant of you dispatcher with some comments: https://gist.github.com/anakryiko/4968492
That is not production-ready code as it lacks in some details like StopDispatching doesn't wait for everybody to stop doing their work, also there are possibilities for race conditions, etc. But as example of the algorithm itself (including restoring after subscription drop) should do, though algorithm itself seems more complicated than it seems to be (that doesn't mean I did the simplest possible solution :) I went slightly different way than you in some places, but overall idea seems ok.

As for durable _lastProcessed position. You have to either remember _lastProcessed position atomically with the result of event at _lastProcessed position (so you can't have _lastProcessed set to some position, but result of processing last event is not stored, and vice versa). The other option is to make sure your handling of events is idempotent, so you don't crash or err when processing the same event twice. If you have handling idempotency, then you can always first save result of processing event and then store _lastProcessed in non-volatile memory. In that case, when crash occurs between you saving result and saving _lastProcessed position, on restart you will handle event at _lastProcessed again (as _lastProcessed at that time will point to previous event). Hope that makes sense to you. :)


Best regards,
Andrii Nakryiko


2013/2/16 Phil <pdo...@gmail.com>

Andrii Nakryiko

unread,
Feb 16, 2013, 5:59:28 PM2/16/13
to event...@googlegroups.com
I've found ways to simplify things slightly and magically most of previous problems were fixed (seems like StopDispatching can wait for end of processing, also many race conditions are gone now, hopefully).
But, please, be warned that I just compiled that code, haven't event run it. Updated version of code is at the same place: https://gist.github.com/anakryiko/4968492


Best regards,
Andrii Nakryiko


2013/2/17 Andrii Nakryiko <andrii....@gmail.com>

Phillip Cleveland

unread,
Feb 17, 2013, 11:59:56 AM2/17/13
to event...@googlegroups.com
That looks good Andrii. Thanks for taking the time to add comments as well. I think your revisions have made the code more elegant...and correct :). I'll do some testing with it today.  

One question I have is around starting the live subscription. One reason I had the synchronization was to prevent the pull from starting while the first batch of historical events was still being processed. In the event that there are a lot of historical events I thought that the live pull could fill up memory before the catch up was mostly complete.  I may be missing that in yours, but it appears that the first reads of historical event do not block the live pull.

Thanks
Phil

Sent from my iPad

Greg Young

unread,
Feb 17, 2013, 12:45:14 PM2/17/13
to event...@googlegroups.com
If you are worried when putting into a queue to hold bound the size of
the queue eg:

void HandleLiveData(Event e) {
if(catchingUp) {
currentData.Enqueue(e);
if(currentData.Length > limit) currentData.Dequeue()

Phil

unread,
Feb 17, 2013, 12:48:40 PM2/17/13
to event...@googlegroups.com
Ahh....I know you said that before and I didn't understand.  Now that I have been working with the code I see what you mean.  The dequeued event will end up being processed from the pull queue, so there is no worry that it would be lost. 

Thanks
Phil

Greg Young

unread,
Feb 17, 2013, 12:49:28 PM2/17/13
to event...@googlegroups.com
Yep exactly.

Andrii Nakryiko

unread,
Feb 17, 2013, 1:22:49 PM2/17/13
to event...@googlegroups.com
In RecoverSubscription I disable processing from _liveEventsQueue by setting _livePublishingAllowed = false;
Then I have to wait while remaining enquequed events are processed by waiting on _liveDone event. There is no need to do this if you just start, but you need to wait for _liveQueue to stop publishing when connection was dropped and you try to reconnect so _lastProcessed won't change in the middle of reading historical events. Other race conditions could apply, if we don't ensure that live processing is stopped.

If you go the way Greg describes, where you subscribe simultaneously with reading historical events, it seems like you have to do more accurate synchronization, because you could end up simultaneously dequeuing event in case queue is full and trying to pop event from live queue in another thread. I haven't thought through this algorithm thoroughly, so I can't tell which way is simpler, though two approaches could be used, of course. The approach you used seems more "sequential" in nature, so is easier to reason and requires less threading-awareness, at least for me.


Best regards,
Andrii Nakryiko


2013/2/17 Greg Young <gregor...@gmail.com>

Phil

unread,
Feb 17, 2013, 3:08:24 PM2/17/13
to event...@googlegroups.com
I see now.  My mistake. I didn't realize that ReadHistoricalEventsFrom would not return until no more slices were pulled (meaning the historical events are nearly complete).

I added

//Prevent live queue memory explosion.
if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT)
            {
                ResolvedEvent throwAwayEvent;
                _liveQueue.TryDequeue(out throwAwayEvent);
            }

to the HandleEventAppeared method, but it may be unnecessary unless there is a possibility that the second call to ReadHistoicalEventFrom could return some amount of events that would take a lot of processing.  That seems unlikely to me though.

Thanks
Phil

Andrii Nakryiko

unread,
Feb 17, 2013, 5:52:12 PM2/17/13
to event...@googlegroups.com
You are asking for missing events with that code. It could happen that neither ReadHistoricalEventsFrom nor live messages processing will process some of events. I doubt business logic will tolerate this :)

It is possible to add prevention of live queue overflow (that will be needed if you process received messages slower than they are appearing over subscription), but that requires more careful approach.


Best regards,
Andrii Nakryiko


2013/2/17 Phil <pdo...@gmail.com>

Phil

unread,
Feb 17, 2013, 6:01:25 PM2/17/13
to event...@googlegroups.com
Hmm.  I guess I am not seeing where the missing event would occur.  Is it that somehow one/some of the thrown away live events could not get read from the pull just as the thread switches?

Thanks
Phil

Andrii Nakryiko

unread,
Feb 18, 2013, 3:17:00 AM2/18/13
to event...@googlegroups.com
Yes, you could discard live event exactly when you have already finished pull, but haven't yet started processing live events. The probability is very low, but still not zero.


Best regards,
Andrii Nakryiko


2013/2/18 Phil <pdo...@gmail.com>

ChristianD

unread,
Feb 18, 2013, 2:16:17 PM2/18/13
to event...@googlegroups.com
Would anyone care to provide some context how you would use this dispatcher? It's not clear to me where this resides. 

Phil

unread,
Feb 18, 2013, 2:19:33 PM2/18/13
to event...@googlegroups.com
You could do something like this.  Just start up the EventStore.SingleNode.exe first

private static readonly IPEndPoint TcpEndPoint = new IPEndPoint(IPAddress.Loopback, 1113);
var esDispatcher = new GetEventStoreEventDispatcher(EventStoreConnection.Create(), TcpEndPoint, evtBus);

System.Threading.Tasks.Task.Factory.StartNew(() => esDispatcher.StartDispatching());


--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Phil

Phil

unread,
Feb 18, 2013, 2:25:15 PM2/18/13
to event...@googlegroups.com
Sorry....just kind of re-read your question and am not sure you were asking specifically how to start it as opposed to how to use it and where is resides.

The dispatcher is basically listening to the GetEventStore for events.  As events come in they are deserialized and Published on the IEventBus that you provide. If you have read the entirety of the thread you will notice there is logic for catching up to the live stream by getting historical events published first.

You are responsible to handling the events as they come off the bus.  The dispatcher just makes sure the event bus Publishes all the events that are added to the GetEventStore.

The code I posted could go into you application bootstrap if you want it to run in proc, but could also be added to a service as well that runs in a separate process.
--
Phil

ChristianD

unread,
Feb 18, 2013, 2:30:14 PM2/18/13
to event...@googlegroups.com
Thanks, that's what I figured. The part that has me a bit confused is who is catching up? The dispatcher? All subscribers? EventStore knows the last position? 

I understand what the goal if this is; it's just not clear how it's doing that.

Regards.

Phil

unread,
Feb 18, 2013, 2:35:54 PM2/18/13
to event...@googlegroups.com
The dispatcher and events handlers down stream are catching up.  The GetEventStore could have been receiving events and persisting them just fine even when no dispatcher was listening.  

In that scenario when the dispatcher is turned on it should publish all the events that weren't previously published, so that the handlers can update the read store.  

Depending on whose gist you look at... it may have some lastRead event persisting logic other than Position.Start.  The one that Andrii and myself have posted recently does not persist that lastReadEvent information, so if you restart the dispatcher it will publish all the events in the GetEventStore streams.

ChristianD

unread,
Feb 18, 2013, 2:47:36 PM2/18/13
to event...@googlegroups.com
Ok, that makes more sense. The part I couldn't find isn't there. :) 

Is it something you just haven't addressed yet? I would think it's an important part.

You say "when the dispatcher is turned on it should publish all the events that weren't previously published", then "if you restart the dispatcher it will publish all the events in the GetEventStore streams". Is this not contradictory? Forgive my ignorance.

Phil

unread,
Feb 18, 2013, 2:55:06 PM2/18/13
to event...@googlegroups.com
Sorry I didn't articulate that very well.  

If you are looking at the code at https://gist.github.com/pdoh00/4744120 you will see that the ctor sets _lastProcessed = new Position(-1, -1);

This means that each time you start this dispatcher up it will read all events from the very beginning. If you wanted to be more efficient you would save off _lastProcessed to non-volatile storage after you processed each event. If you brought the dispatcher down, or it crashed, you would restart it passing in the persisted last processed event position. That way it would not read from the very beginning. For me I just read from the very beginning and rebuild my entire read store since I don't have many events and it will be very quick just to rebuild the entire thing.


In summary if you want  "when the dispatcher is turned on it should publish all the events that weren't previously published" to be true you need to persist the _lastProcessed and pass it into the ctor as a parameter.

Cheers
Phil

ChristianD

unread,
Feb 18, 2013, 3:08:56 PM2/18/13
to event...@googlegroups.com
There we go! Thanks. 

Interesting that you only refer to read stores. What about subscribers in other sub-domains that process these?  Maybe my definition of read store differs.

 In my domain I would be processing more than a billion events in less than a calendar year.

Phil

unread,
Feb 18, 2013, 3:14:38 PM2/18/13
to event...@googlegroups.com
I think the number of events to process as well as how the events are processed by listeners play into how you proceed.  Your system will need to be idempotent by either ensuring you never publish the same event twice or ensuring that all your down stream handlers make sure to ignore duplicate events.  It sounds in your case you may want to do the former by persisting of the _lastProcessed variable and only publishing from that point forward when the dispatcher starts.

ChristianD

unread,
Feb 18, 2013, 3:21:53 PM2/18/13
to event...@googlegroups.com
Yes, I think that's the only way. For much of my system events are not idempotent (unless I am out to lunch). I will need to track event id's on the subscriber to ensure duplicates are ignored. 

Your dispatcher is a great help. Now that I understand what it does and does do, it gives me a running start. 

Greg Young

unread,
Feb 19, 2013, 1:14:11 AM2/19/13
to event...@googlegroups.com
If you store the _lastProcessed transactionally with the whatever the
handlers are doing you are transactional (not idempotent).

ChristianD

unread,
Feb 19, 2013, 1:16:41 AM2/19/13
to event...@googlegroups.com
is this good, bad, or just an observation? 

Greg Young

unread,
Feb 19, 2013, 1:19:37 AM2/19/13
to event...@googlegroups.com
Its an observation. If you need transactional behaviour instead of
idempotency just store that variable with whatever is writing. Then
full transactional behaviour is achieved. The problem with doing that
is often its slower. If performance becomes a problem then look at
idempotency.

On Tue, Feb 19, 2013 at 8:16 AM, ChristianD

ChristianD

unread,
Feb 19, 2013, 1:21:57 AM2/19/13
to event...@googlegroups.com
I am not sure how to achieve idempotency. If events are cumulative how is that possible? Unless my application of the term is off... 

Greg Young

unread,
Feb 19, 2013, 1:26:02 AM2/19/13
to event...@googlegroups.com
Projections for example are idempotent. Lets say you receive an event
"LedgerItemAdded" it just means you have to check if you have
processed it before. If you are say adding them to a table this is
relatively easy. query to see if one with that id already exists.
Where this is more difficult is when you are updating something such
as updating a count. In this case it can be harder to make the
operation idempotent. Luckily is such situations by simply saving that
variable you don't need to worry about it :)

Gabriel Marquez

unread,
Apr 29, 2013, 2:53:03 PM4/29/13
to event...@googlegroups.com
Guys, 

As Greg pointed I have some projections which are more difficult: "when you are updating something such 
as updating a count."

And my worst I didn't build my projections with idempotent concept in mind, so you could imagine that I'm having trouble with counting and summing events.

There is any best pratices on how handle this more complex projections?
I'm not using the EventStore Projections Engine.


Kind and regards.

Rich Knight

unread,
Apr 29, 2013, 4:40:40 PM4/29/13
to
The catchup subscriptions built in to the latest client API can do what you need.

Use the method SubscribeToAllFrom(...) on EventStoreConnection. This takes a Position as a paramater, and will play back all historical events from that point onwards, taking care of the tricky crossover between historical events and new ones coming in over the subscription.

You will get instances of ResolvedEvent passed to your EventAppeared callback, and you can get the next "start from" position off of that event from the OriginalPosition property.

The key to getting at-most-once messaging like you are asking for is to store this value to your data store in the same transaction as whatever work you are doing. 

This way if something goes horribly wrong, you have written the position associated with whatever event you last successfully processed. Just load the last processed value you stored, and pass it in to SubscribeToAllFrom to carry on ... no missed or duplicate events :)

James put up a reasonable example of an event dispatcher a little while ago:


You would want to wrap the contents of HandleNewEvent into a single transaction.

If you are just interested in events from a single stream, SubscribeToStreamFrom(...) works in a very similar way, but you will be working based on the Integer position of events within the stream, rather than global Position values.

Gabriel Marquez

unread,
Apr 30, 2013, 11:06:26 AM4/30/13
to event...@googlegroups.com
Thanks you very much!

I think now I can migrate my JO ES.


Em segunda-feira, 29 de abril de 2013 17h30min22s UTC-3, Rich Knight escreveu:
The catchup subscriptions built in to the latest client API can do what you need.

Use the method SubscribeToAllFrom(...) on EventStoreConnection. This takes a Position as a paramater, and will play back all historical events from that point onwards, taking care of the tricky crossover between historical events and new ones coming in over the subscription.

You will get instances of ResolvedEvent passed to your EventAppeared callback, and you can get the next "start from" position off of that event from the OriginalPosition property.

The key to getting at-most-once messaging like you are asking for is to store this value to your data store in the same transaction as whatever work you are doing. 

This way if something goes horribly wrong, you have written the position associated with whatever event you last successfully processed. Just load the last processed value you stored, and pass it in to SubscribeToAllFrom to carry on ... no missed or duplicate events :)

James put up a reasonable example of an event dispatcher a little while ago:


You would want to wrap the contents of HandleNewEvent into a single transaction.

If you are just interested in events from a single stream, SubscribeToStreamFrom(...) works in a very similar way, but you will be working based on the Integer position of events within the stream, rather than global Position values.


On Monday, April 29, 2013 7:53:03 PM UTC+1, Gabriel Marquez wrote:
Message has been deleted

James Nugent

unread,
Jul 5, 2013, 4:29:19 PM7/5/13
to <event-store@googlegroups.com>, event...@googlegroups.com
Hi,

What do you mean by multiple apps?


James

Sent from my iPhone

On 5 Jul 2013, at 21:19, "shenzw" <shibu...@gmail.com> wrote:

If I have multiple apps to share with the EventStore, I think maybe I should run multiple dispatcher for each app. Does it seem possible without concurrency problems?

shenzw

unread,
Jul 7, 2013, 12:02:34 PM7/7/13
to event...@googlegroups.com
Hi James, I have several sub systems that have some shared data and also with data of their own. I try to create a single event store  for them to share with each other.
Because the sub systems can be running seperately, so I think I should create event dispatchers for each to subscribe the events only belong to them?
 
Regards,
 
ziwen

2013年7月6日土曜日 5時29分19秒 UTC+9 James Nugent:

Phillip Cleveland

unread,
Jul 7, 2013, 12:13:03 PM7/7/13
to event...@googlegroups.com
In my opinion you will quickly add unnecessary complexity if you add many dispatchers/buses. Why not just have one that all your apps/systems pub/sub on? If you have many then I think you are going to have a really tough time thinking you lost MSG's when they are just on a diff bus.

Phil

Sent from my iPad

shenzw

unread,
Jul 7, 2013, 7:19:30 PM7/7/13
to event...@googlegroups.com
Hi phil. Thank you for your quick response. Yes,that seems more reasonable, but how about the fail over strategy if the dispatcher was down? Any suggestions? Thanks.

Regards,

Ziwen

Phillip Cleveland

unread,
Jul 7, 2013, 8:50:05 PM7/7/13
to event...@googlegroups.com
Well if the apps/systems are distributed you could look into RabbitMQ
or NServiceBus. I think they both support durable messaging...which
means if the bus is down the messages queue locally until connection
is restored.

Sent from my iPad

Greg Young

unread,
Jul 8, 2013, 3:37:44 AM7/8/13
to event...@googlegroups.com
You can use the eventstore as a durable queue. Putting rabbit in front of it in most cases would be overkill. Why not just use a file at that point?

Greg Young

unread,
Jul 8, 2013, 4:10:55 AM7/8/13
to event...@googlegroups.com
Also you will find much much better up times with es as a backend than using rabbit clustering etc. it can handle both machine losses and network partitions. If you have five machines so long as any three can talk to each other (say 2x2x1 by data centers) you will be available. The clustering easily handles machine failures and network partitions (we run over a thousand per day eg actual power pulls)

Greg
--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.




--

Rinat Abdullin

unread,
Jul 8, 2013, 4:26:11 AM7/8/13
to event...@googlegroups.com
How would you use ES as a durable message queue (keeping a local checkpoint or publishing "message consumed)?

Rinat Abdullin | Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts

Greg Young

unread,
Jul 8, 2013, 4:33:44 AM7/8/13
to event...@googlegroups.com
Keeping local checkpoint or holding the last uri with atom. If you store it transactionally with your interpretation of it you will also reach only once messaging. Eg I have a projection in sqlserver if I store my checkpoint in sqlserver as part of my operation I don't need to worry about idempotency.

There in the new client API is an object that even handles this for you (including switching servers etc in clustered mode). It's here https://github.com/EventStore/EventStore/blob/dev/src/EventStore/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs

Rinat Abdullin

unread,
Jul 8, 2013, 6:31:01 AM7/8/13
to event...@googlegroups.com
Cool! Thanks, Greg.

Best,
Rinat

shenzw

unread,
Jul 10, 2013, 2:02:33 AM7/10/13
to event...@googlegroups.com
If ES can be used as durable message queue just like service bus for denormalizing read models, could ES also be used as command bus and command handler subscribe to the command streams? like per command stream per Aggregate?

Regards.
Ziwen


Keeping local checkpoint or holding the last uri with atom. If you store it transactionally with your interpretation of it you will also reach only once messaging. Eg I have a projection in sqlserver if I store my checkpoint in sqlserver as part of my operation I don't need to worry about idempotency.

There in the new client API is an object that even handles this for you (including switching servers etc in clustered mode). It's here https://github.com/EventStore/EventStore/blob/dev/src/EventStore/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs

On Monday, July 8, 2013, Rinat Abdullin wrote:
How would you use ES as a durable message queue (keeping a local checkpoint or publishing "message consumed)?

Rinat Abdullin | Technology Leader at Lokad.com | Writer at Abdullin.com | Contacts


On Mon, Jul 8, 2013 at 2:10 PM, Greg Young <gregor...@gmail.com> wrote:
Also you will find much much better up times with es as a backend than using rabbit clustering etc. it can handle both machine losses and network partitions. If you have five machines so long as any three can talk to each other (say 2x2x1 by data centers) you will be available. The clustering easily handles machine failures and network partitions (we run over a thousand per day eg actual power pulls)

Greg


On Monday, July 8, 2013, shenzw wrote:
Hi phil. Thank you for your quick response. Yes,that seems more reasonable, but how about the fail over strategy if the dispatcher was down? Any suggestions? Thanks.

Regards,

Ziwen

--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.




--
Le doute n'est pas une condition agréable, mais la certitude est absurde.

--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 

--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 

shenzw

unread,
Jul 10, 2013, 10:16:17 PM7/10/13
to event...@googlegroups.com
Anybody can give some advice about this question? It seems possible if command handler competition can be solved in some kind of ways?

Greg Young

unread,
Jul 10, 2013, 10:25:47 PM7/10/13
to event...@googlegroups.com
It depends. There are quite a few patterns for dealing with competition (round robin, shortest line, competing consumer, etc). It becomes more true if you are writing events out as typos can allow the same command to be handled multiple times (es is idempotent). What are you trying to setup?

shenzw

unread,
Jul 10, 2013, 10:54:42 PM7/10/13
to event...@googlegroups.com
Thank you for your quick response, Greg.
Assume a SOA distributed system. Clients(Silverlight) send commands to a WCF service stands for sending command to ES, and there runs multiple instances of command handler hosted on different servers to subscribe to the same command stream, but the command can be processed by only one handler at the same time and only once. is it possible without use of any other service bus? Thanks.

Regards.
Ziwen

Greg Young

unread,
Jul 11, 2013, 5:12:57 AM7/11/13
to event...@googlegroups.com
If the es provided idempotency why could they only be processed once? Also do they need perfect ordering?

shenzw

unread,
Jul 11, 2013, 10:56:15 AM7/11/13
to event...@googlegroups.com
Because the command handler maybe send mails and I don't want duplicated mails.
Also the perfect ordering of commands sent from client should be necessary.

Regards.
Ziwen

Greg Young

unread,
Jul 11, 2013, 11:12:03 AM7/11/13
to event...@googlegroups.com
How would you suggest perfect ordering with multiple processors?
Surely some commands are faster than others if two are being done at
the same time and the first takes off queue but spend 1.2 seconds and
the second only .5 the second would finish first. Unless you support
some kind of batching internally where a command is really a command[]
>>> email to event-store...@googlegroups.com.
>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>>
>>>
>>>
>>> --
>>> Le doute n'est pas une condition agréable, mais la certitude est absurde.
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "Event Store" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to event-store...@googlegroups.com.
>>> For more options, visit https://groups.google.com/groups/opt_out.
>>>
>>>
>>>
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "Event Store" group.
>>> To unsubscribe from this group and stop receiving emails from it, send an
>>> email to event-store+unsubscribe@
>>
>>
>>
>> --
>> Le doute n'est pas une condition agréable, mais la certitude est absurde.
>
> --
> You received this message because you are subscribed to the Google Groups
> "Event Store" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to event-store...@googlegroups.com.

shenzw

unread,
Jul 11, 2013, 12:58:20 PM7/11/13
to event...@googlegroups.com
The clients are Silverlight based, assume user invokes Command A and B for the same aggregate in sequence,
If Command B starts to be handled before A is being done and writes new events to ES successfully, B handler will get the old aggregate
and maybe leads to some business rule violence. As you said, command[A,B] is the real command. So it might be better
to compose A and B into a single command I think.
Back to the first question. In case of command being processed only once, can I get some info from ES about if a command(event) being
subscribed has been processed by any handler? Or have to do something out of the ES?

Regards.
Ziwen
Reply all
Reply to author
Forward
0 new messages