var businessifier = new TransformBlock<string, string>(s => "Dear " + s);
var batcher = new BatchBlock<string>(5);
var emailer = new ActionBlock<string[]>(s => EmailToBoss(s)); // Action now takes array of string
// ...
businessifier.LinkTo(batcher);
batcher.LinkTo(emailer);
--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
private static void Process(ResolvedEvent[] batch)
{
// do work
}
static void Main()
{
// setup events to be batched, 50 at a time
var batcher = new BatchBlock<ResolvedEvent>(50);
// setup processing of the batch
var processor = new ActionBlock<ResolvedEvent[]>(Process);
// link batcher to processor, including completion
batcher.LinkTo(processor, new DataflowLinkOptions {PropagateCompletion = true}); // start the subscription and have it post to the batcher
var subscription = eventStoreConnection.SubscribeToAllFrom(..., eventAppeared:(sub, evt) => batcher.Post(evt));
// let it run for a while
Thread.Sleep(10000);
// shut it down
subscription.Unsubscribe();
// if you want to wait for queued messages to finish before stopping
batcher.TriggerBatch(); // flush partial batch
batcher.Complete(); // stop taking messages, also completes processor
processor.Completion.Wait(); // wait for queued messages to be processed
}Yes, I guess you are right. However, it would still be useful as it would mean no need for the timer or manual batching.
Do you have any code available?
On Thursday, September 18, 2014, Kasey Speakman wrote:
Not a GES guy, but I doubt any network traffic would be saved by batching on the TCP client. The connection is held open and messages are pushed from the server AFAIK.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.
--
You received this message because you are subscribed to a topic in the Google Groups "Event Store" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/event-store/aXy9iVObqRY/unsubscribe.
To unsubscribe from this group and all its topics, send an email to event-store+unsubscribe@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Event Store" group.
To unsubscribe from this group and stop receiving emails from it, send an email to event-store...@googlegroups.com.
To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.
var gapBetweenEvents = subscription.Throttle(TimeSpan.FromMilliseconds(100));subscription .Window(() => gapBetweenEvents) .Select(x => x.Buffer(500)) .Switch() .Subscribe(handleEvents);...--
You received this message because you are subscribed to the Google Groups &qu
class BatchedEventSubscriber{ readonly IEventStoreConnection connection; readonly string streamId; readonly int? lastProcessedVersion; readonly bool resolveLinks; readonly Action<ResolvedEvent[]> batchAppeared; readonly ActionBlock<ResolvedEvent[]> batchRunner; readonly BatchBlock<ResolvedEvent> batcher; volatile bool isLive;
public BatchedEventSubscriber(IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<ResolvedEvent[]> batchAppeared, int batchSize = 500) { this.connection = connection; this.streamId = streamId; this.lastProcessedVersion = lastProcessedVersion; this.resolveLinks = resolveLinks; this.batchAppeared = batchAppeared; batchRunner = new ActionBlock<ResolvedEvent[]>(batchAppeared); batcher = new BatchBlock<ResolvedEvent>(batchSize); batcher.LinkTo(batchRunner); }
public EventStoreStreamCatchUpSubscription Subscribe() { return connection.SubscribeToStreamFrom( streamId, lastProcessedVersion, resolveLinks, EventAppeared, LiveProcessingStarted ); }
void LiveProcessingStarted(EventStoreCatchUpSubscription subscription) { isLive = true; }
void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent) { batcher.Post(resolvedEvent); bool doPartialBatch; lock (batchRunner) // lock is for batchRunner.InputCount, batcher.OutputCount, whose thread safety is unknown doPartialBatch = isLive && batchRunner.InputCount == 0 && batcher.OutputCount != 0; if (doPartialBatch) batcher.TriggerBatch(); }}...
To unsubscribe from this group and all its topics, send an email to event-store...@googlegroups.com.