Batched live processing for subscriptions

392 views
Skip to first unread message

Jonathan Curtis

unread,
Sep 18, 2014, 10:58:12 AM9/18/14
to event...@googlegroups.com
I'm building a C# client using the TCP interface that will efficiently store a projection in a view model database (eg document or graph db). I can easily batch the events before the subscription catches up by flushing using the liveProcessingStarted delegate.

However, there is no way I can see to easily batch events when in "live processing" mode. The scenario I am envisaging is that lots of events have been generated quicker than they can normally be processed, and we now want the client to batch.

To facilitate this, would it be possible to add the position in the stream, plus the last known number at the point the event is sent? This way we could make an intelligent decision on whether to batch or not. Alternatively it could be a flag like EventSlice.IsEndOfStream, or even just sending all the events in a pre-configured batch size.

The only way I can see to do this at the moment would be to wait some period of time to see if another event comes in, before processing.

Cheers,
Jon

Kasey Speakman

unread,
Sep 18, 2014, 12:17:56 PM9/18/14
to event...@googlegroups.com
I've been playing with the .NET TPL Dataflow library lately, and they have BatchBlock just for this purpose.

Just keep posting all the events to the BatchBlock as you receive them and when the count meets the minimum batch size, it will send a batch of them to your handling method as an array. You may not even need to worry whether you are live or catchup.

In case a full batch was taking too long to populate, you could also call trigger partial batches on a timer like mentioned here.

Or after processing a batch, you might be able to use OutputCount to determine whether you just wanted to go ahead with the next partial batch and then call TriggerBatch().

I haven't played with BatchBlock yet, so I'm just spit-balling. :)

Kasey Speakman

unread,
Sep 18, 2014, 12:28:48 PM9/18/14
to event...@googlegroups.com
For easier use, link your BatchBlock to an ActionBlock. Here is an example (scroll down to Chunky vs Chatty)


Interesting code snippet:

var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var batcher = new BatchBlock<string>(5);
var emailer = new ActionBlock<string[]>(s => EmailToBoss(s));  // Action now takes array of string
// ...
businessifier
.LinkTo(batcher);
batcher
.LinkTo(emailer);

Jonathan Curtis

unread,
Sep 18, 2014, 12:37:37 PM9/18/14
to event...@googlegroups.com
Awesome :) I'll definitely switch to this approach.

However, I'd also like to know if my suggestions would be a good idea. Specifically setting a batch size with the subscription, as this would minimise network calls. 
--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Kasey Speakman

unread,
Sep 18, 2014, 12:52:36 PM9/18/14
to event...@googlegroups.com
Not a GES guy, but I doubt any network traffic would be saved by batching on the TCP client. The connection is held open and messages are pushed from the server AFAIK.

Jonathan Curtis

unread,
Sep 18, 2014, 1:32:52 PM9/18/14
to event...@googlegroups.com
Yes, I guess you are right. However, it would still be useful as it would mean no need for the timer or manual batching.

Do you have any code available?

Kasey Speakman

unread,
Sep 18, 2014, 3:22:40 PM9/18/14
to event...@googlegroups.com
I'm not using batching myself, but here is a rough draft (not tested):

private static void Process(ResolvedEvent[] batch)
{
   
// do work
}

static void Main()
{
   
// setup events to be batched, 50 at a time
   
var batcher = new BatchBlock<ResolvedEvent>(50);
   
// setup processing of the batch
   
var processor = new ActionBlock<ResolvedEvent[]>(Process);
   
// link batcher to processor, including completion
    
batcher.LinkTo(processor, new DataflowLinkOptions {PropagateCompletion = true});
    // start the subscription and have it post to the batcher
   
var subscription = eventStoreConnection.SubscribeToAllFrom(..., eventAppeared:(sub, evt) => batcher.Post(evt));
    // let it run for a while
   
Thread.Sleep(10000);
   
// shut it down
    subscription
.Unsubscribe();
   
// if you want to wait for queued messages to finish before stopping
    
batcher.TriggerBatch(); // flush partial batch
    
batcher.Complete(); // stop taking messages, also completes processor
    processor.Completion.Wait(); // wait for queued messages to be processed
}

batcher will queue up messages until it has 50, then it will send those 50 to processor. processor in turn sticks each batch into a queue, then runs the Process method on them, one batch at a time.


On Thursday, September 18, 2014 12:32:52 PM UTC-5, Jonathan Curtis wrote:
Yes, I guess you are right. However, it would still be useful as it would mean no need for the timer or manual batching.

Do you have any code available?

On Thursday, September 18, 2014, Kasey Speakman wrote:
Not a GES guy, but I doubt any network traffic would be saved by batching on the TCP client. The connection is held open and messages are pushed from the server AFAIK.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.

Peter Hageus

unread,
Sep 19, 2014, 7:23:45 AM9/19/14
to event...@googlegroups.com
Have a look at rx (Reactive Extensions) while you're at it, it's very well suited for these kind of tasks. (nothing wrong with DataFlow either)

/Peter

--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.

Jonathan Curtis

unread,
Sep 19, 2014, 9:00:36 AM9/19/14
to event...@googlegroups.com, pe...@hageus.se
Any thoughts on rx vs Dataflow?

Peter Hageus

unread,
Sep 19, 2014, 9:10:09 AM9/19/14
to event...@googlegroups.com
They can do pretty much the same thing, but I'm biased towards rx more functional style + you get a lot for free, like windowing, buffering, streams of streams, errorhandling etc.  I get the impression there are more resources on the net for it as well. Dataflow is more like you're basic lego blocks. But it's very much down to taste and opinion... 

/Peter

Kasey Speakman

unread,
Sep 19, 2014, 9:31:09 AM9/19/14
to event...@googlegroups.com
I've never used rx, only given it a cursory look, so I can't do a comparison. The main thing I have used in TPL dataflow is the action block, because it's a drop-in replacement for a standard queue + worker. It's great for processing messages, and I add one as a private field to use for those types of components.

To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.

Kasey Speakman

unread,
Sep 19, 2014, 9:39:19 AM9/19/14
to event...@googlegroups.com, pe...@hageus.se

Jonathan Curtis

unread,
Sep 19, 2014, 9:40:34 AM9/19/14
to event...@googlegroups.com
I'm gonna try both and see which I like. Rx looks nice:

var gapBetweenEvents = subscription.Throttle(TimeSpan.FromMilliseconds(100));
subscription
   .Window(() => gapBetweenEvents)
   .Select(x => x.Buffer(500))
   .Switch()
   .Subscribe(handleEvents);

This batches every 500 events, unless we don't receive an event for 100ms, in which case it flushes.

Peter Hageus

unread,
Sep 19, 2014, 9:52:24 AM9/19/14
to event...@googlegroups.com
Theres an overload on Buffer that takes a TimeSpan and and count, thats all you need for your desired behaviour (to be honest I'm not sure how your code would behave, I wouldn't use either Throttle or Window for eventprocessing, but it might work due to the way you've composed them)

/Peter

Kasey Speakman

unread,
Sep 19, 2014, 9:53:53 AM9/19/14
to event...@googlegroups.com
This is the reason I only gave rx a cursory look. I am very comfortable with lambdas, but looking at this code, the language of it does not make it obvious to me what is happening.

Jonathan Curtis

unread,
Sep 19, 2014, 10:47:22 AM9/19/14
to event...@googlegroups.com
It's not quite the same behaviour. With the buffer overload, you have to make the timeout longer than the time to receive the batch, otherwise the timer will always trip. With the throttle, you can make the timeout much shorter, because if you receive an event within that period, it will reset. This gives you the best of both worlds, speed for individual events or batched for when you get bulk events.

Actually, you might want to use the buffer overload in the code I posted for the case where events are being received at a rate similar to the throttle.

Raif Harik

unread,
Sep 19, 2014, 1:08:54 PM9/19/14
to event...@googlegroups.com
@Jonathan Curtis,
Let us know which way you go and why.  I will be making a similar decision soon and I'd rather you do all the work :).  just kidding, but I'd like to hear your thoughts.  I've been reading up on TPL and datablocks look pretty damn sweet.
r

Peter Hageus

unread,
Sep 22, 2014, 3:05:20 AM9/22/14
to event...@googlegroups.com
OK, interesting! I've always gone for a max latency rather than optimal batchsize, but haven't really measured the effects. 

Greg Young

unread,
Sep 22, 2014, 3:07:45 AM9/22/14
to event...@googlegroups.com
Normally you go for a heuristic that tries to optimize both. An example of dynamic batching can actually be seen in the event store in storagewriter. Basically the rules are quite simple. If your queue is empty write. Else watch how long operations are taking and dynamically make them bigger if your queue has items in it. This can be done on the outside by looking if you are caught up, if caught up write immediately else batch.
Studying for the Turing test

Jonathan Curtis

unread,
Sep 22, 2014, 4:18:51 AM9/22/14
to event...@googlegroups.com
I can't see a way with the TCP connection to know if you are behind once live processing has fired.

João Bragança

unread,
Sep 22, 2014, 4:31:16 AM9/22/14
to event...@googlegroups.com
It is really unlikely to fall behind once you are caught up - unless your projection writes are really really slow.

Jonathan Curtis

unread,
Sep 22, 2014, 4:50:52 AM9/22/14
to event...@googlegroups.com, joao...@braganca.name
Well, it depends on the frequency/distribution of events and the speed of your writes. Having a way to know if you are caught up at all times would be useful in some circumstances. Maybe your you need to do regular batch importing of events, for example.

Greg Young

unread,
Sep 22, 2014, 7:45:28 AM9/22/14
to event...@googlegroups.com, João Bragança
Once you are live just put them in a local queue then you know if you are caught up :)

Jonathan Curtis

unread,
Sep 22, 2014, 8:18:42 AM9/22/14
to event...@googlegroups.com, joao...@braganca.name
That is what RX Buffer and TPL Dataflow BufferBlock do, but you still have to decide how long to wait before flushing and how long to wait between events. If there was an IsEndOfStream flag, we can flush immediately. Even better if eventstore supported batched subscriptions.

Greg Young

unread,
Sep 22, 2014, 9:10:12 AM9/22/14
to event...@googlegroups.com, João Bragança
EventStore DOES support batches subscriptions (read forward) and there is an eof flag associated with it.

IsEndOfStream would be meaningless on a live pushed subscription as as of the time of sending it would always be true.

Jonathan Curtis

unread,
Sep 22, 2014, 9:19:13 AM9/22/14
to event...@googlegroups.com, joao...@braganca.name
OK, I'm a bit confused by how the TCP client actually works. Does it have it's own clientside queue? What happens if events are saved to eventstore faster than the client can handle them?

Also, what happens if you save multiple events in a transaction? Wouldn't IsEndOfStream would be false until the last event sent?

James Nugent

unread,
Sep 22, 2014, 9:24:00 AM9/22/14
to event...@googlegroups.com, Jonathan Curtis, joao...@braganca.name
Yes, there is a client side queue which is bounded. If the client cannot handle the rate, it will be dropped by the server to avoid service degredation, and the client has to fall back to pulling instead.

EndOfStream is only set for reads as far as I'm aware, not on push subscriptions. Transactions are a write-only concern so have no impact here.


James



Greg Young

unread,
Sep 22, 2014, 9:39:59 AM9/22/14
to event...@googlegroups.com, João Bragança
Well there are multiple levels (let's discuss live subscriptions first). Eg SubscribeTo (not subscribe from)

Internally there is an event that occurs when you append to a stream. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Messages/StorageMessage.cs#L224

This event gets picked up by a subscription service who maintains which connections are subscribed to what things (note not discussing competing consumers here they are different). https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/SubscriptionsService.cs this will then route an EventAppeared message to your given connection. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/SubscriptionsService.cs#L267

Connections have a sizable buffer of data that they are queuing in order to actually be put over the TCP socket data is stored here to be forwarded. https://github.com/EventStore/EventStore/blob/dev/src/EventStore.Core/Services/Transport/Tcp/TcpConnectionManager.cs#L349 if a buffer gets too big e.g. the socket can't keep up the socket will be closed.

On the other side...

The client then receives the messages and they get raised back to the client code https://github.com/EventStore/EventStore/blob/dev/src/EventStore.ClientAPI/ClientOperations/SubscriptionOperation.cs#L248 (one item at a time on thread pool) its queued here as well (we should bound this queue as well just put up a card for it). Most however read off directly into their own client queue where they can control it.

This is for live subscriptions. A catchup subscription if it gets too many messages or if it drops connection etc will fall back to polling until it gets caught up (e.g. readeventsforward) until it gets caught up at which point it switches to a live push subscription (and does this internally so you don't have to see any of the work involved).

Cheers,

Greg



Greg Young

unread,
Sep 22, 2014, 9:57:48 AM9/22/14
to event...@googlegroups.com, João Bragança

Jonathan Curtis

unread,
Sep 22, 2014, 12:06:54 PM9/22/14
to event...@googlegroups.com, joao...@braganca.name
Thanks for the explanation.

Couldn't the ExecuteActions method on SubscriptionOperation pass a bool to eventAppeared to indicate if there are anymore items in the queue? Or why not just store ResolvedEvents on the queue and send them all to eventAppeared as an array?

I guess I could write this into my own queue, but it seems like it would be nice if eventstore did it for me :)

--
You received this message because you are subscribed to the Google Groups &qu
...

Greg Young

unread,
Sep 22, 2014, 1:01:18 PM9/22/14
to event...@googlegroups.com, João Bragança
"Couldn't the ExecuteActions method on SubscriptionOperation pass a bool to eventAppeared to indicate if there are anymore items in the queue? Or why not just store ResolvedEvents on the queue and send them all to eventAppeared as an array?"

This sounds like code very specific to your scenario. It is trivial to compose such behaviours (that is the goal).

Jonathan Curtis

unread,
Oct 8, 2014, 5:23:41 AM10/8/14
to event...@googlegroups.com
OK, so got round to having a play with this. Ended up not using RX or TPL Dataflow, I think they would just complicate the situation. I copied the idea from the eventstore client API itself.

On eventAppeared I store the event in a BlockingCollection, then start a new thread to execute a batch delegate. New events coming in get stored on the original thread in the BlockingCollection while the batch is being processed, when the batch is finished, it just executes a new batch with all the stored up events. I give the BlockingCollection a max size equal to the desired batch size, so it blocks on add if the batch size is reached. In this situation the original eventAppeared delegate will block and the client API queuing will kick in. With this solution we don't need to worry about whether the subscription is live or not.

Hope that makes sense. Here's a gist of the (boiled down) code. Would appreciate any thoughts on this approach.

Cheers,
Jon

Jonathan Curtis

unread,
Oct 8, 2014, 6:19:29 AM10/8/14
to event...@googlegroups.com

Kasey Speakman

unread,
Oct 8, 2014, 8:57:24 AM10/8/14
to event...@googlegroups.com
What happens when the collection is empty? Does your thread spin or wait handle? I can't imagine this being more efficient or less code than using an ActionBlock from TDF. The ActionBlock is both a concurrent queue and a worker, but it uses tasks on the thread pool (configurable) instead of tying up a thread indefinitely.

Kasey Speakman

unread,
Oct 8, 2014, 9:12:48 AM10/8/14
to event...@googlegroups.com
Okay, I actually looked at the gist, so nevermind the questions, but I still stand by my statements. Using those components, the stuff that is easiest to get wrong is done for you.

Jonathan Curtis

unread,
Oct 8, 2014, 9:57:42 AM10/8/14
to event...@googlegroups.com
I'm not sure how you can achieve the same thing using ActionBlock. In the EventAppeared delegate we need to know if a batch is currently being processed, that's what the usage of Interlocked is doing. You'd still need to do the same thing if you used ActionBlock as we need a way to signal back to the queuing thread that the batch is complete.

My code only creates a thread when there is work to do and yields back when it's finished, I don't see how Rx or TPL will help here. I guess I could replace the ThreadPool.QueueUserWorkItem with an ActionBlock, but I'm not sure that adds much value.

Kasey Speakman

unread,
Oct 8, 2014, 11:27:49 AM10/8/14
to event...@googlegroups.com
My changes to your code below (not tested). Note that I got rid of the subscription being returned to the batch processor. The only time I ever use the subscription is to stop it, and I have always done that from the one returned from a SubscribeXXX method. The only other thing you can check on the sub is whether it's subscribed to all (should be obvious from code semantics) or the stream it's subscribed to (stream is also on the event).

I added live processing awareness so it will wait for full batches in catchup, but trigger partial batches if the subscription is live but nothing is currently waiting.

class BatchedEventSubscriber
{
    readonly IEventStoreConnection connection;
    readonly string streamId;
    readonly int? lastProcessedVersion;
    readonly bool resolveLinks;
    readonly Action<ResolvedEvent[]> batchAppeared;
    readonly ActionBlock<ResolvedEvent[]> batchRunner;
    readonly BatchBlock<ResolvedEvent> batcher;
    volatile bool isLive;

    public BatchedEventSubscriber(IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<ResolvedEvent[]> batchAppeared, int batchSize = 500)
    {
        this.connection = connection;
        this.streamId = streamId;
        this.lastProcessedVersion = lastProcessedVersion;
        this.resolveLinks = resolveLinks;
        this.batchAppeared = batchAppeared;
        batchRunner = new ActionBlock<ResolvedEvent[]>(batchAppeared);
        batcher = new BatchBlock<ResolvedEvent>(batchSize);
        batcher.LinkTo(batchRunner);
    }

    public EventStoreStreamCatchUpSubscription Subscribe()
    {
        return connection.SubscribeToStreamFrom(
            streamId,
            lastProcessedVersion,
            resolveLinks,
            EventAppeared,
            LiveProcessingStarted
            );
    }

    void LiveProcessingStarted(EventStoreCatchUpSubscription subscription)
    {
        isLive = true;
    }

    void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)
    {
        batcher.Post(resolvedEvent);
        bool doPartialBatch;
        lock (batchRunner) // lock is for batchRunner.InputCount, batcher.OutputCount, whose thread safety is unknown
            doPartialBatch = isLive && batchRunner.InputCount == 0 && batcher.OutputCount != 0;
        if (doPartialBatch)
            batcher.TriggerBatch();
    }
}

All of the constuctor parameters should be method parameters on subscribe and not stored in the class, but I didn't change that.

Not sure if the lock is needed, but put it in there, in case.

InputCount can still be 0 when a batchRunner is busy, but the worst that happens is it queues a small batch behind the current one. If you were live processing 500 per second, then you'd probably just want to cut out the live part and leave it batching the full 500 every time.

Kasey Speakman

unread,
Oct 8, 2014, 11:36:14 AM10/8/14
to event...@googlegroups.com
Most of the constructor parameters. :)
...

Jonathan Curtis

unread,
Oct 10, 2014, 4:56:06 AM10/10/14
to event...@googlegroups.com
Cool, I like the code, didn't know about InputCount and OutputCount. However, this is around 10-15% slower than the Interlocked + ThreadPool.QueueUserWorkItem version.

To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages