Synchronous commands and event sourcing

1,115 views
Skip to first unread message

MichaelK

unread,
Dec 19, 2012, 11:53:19 AM12/19/12
to ddd...@googlegroups.com
Hi All,

I am struggling in figuring out the best way to handle synchronous commands while using event sourcing.

Stating my assumptions up front.
1) Web servers should not publish events and this gets even more complicated when a web farm is involved 
2) Async command processing is an option but would really like to get sync commands working as well
3) Using JOlivers EventStore and CommonDomain libraries. This handles persisting events to event store and publishing events out

Under the Async model 
X web servers publish commands via NServiceBus to "Command Service" hosted on another machine. This handles the commands and then publishes out events. All consumers are happy since they only need to subscribe to events from the "Command Service" (Assuming 1 BC for now). Web servers are dumb just publishing commands and its easy to scale out additional web servers.

Under the Sync model
X web servers process commands synchronously. Now each web server handles the commands, persists events to event store and publishes events. This is bad since the web servers should not publish events. My thought was to try to break out the event sourcing part where I would write to the event store on the web servers BUT then publish a "Message"  via NSB to the "Event Publisher Service" that would grab those events just persisted and publish them out. It would actually grab all events that have not been published yet. I could also just have the "Event Publisher Service" poll the event store for unpublished events but I figured its better to inform the service that new events came in vs polling.

My question is, how are other people handling sync commands when dealing with web farms, and is what I described above a decent way of solving it? 

Thank you
-Michael

João Bragança

unread,
Dec 19, 2012, 12:13:51 PM12/19/12
to ddd...@googlegroups.com
Just because you are hosting your local bus in a web app doesn't mean the web app is responsible for publishing events. EventStore is.

I am running a similar setup on appharbor: Sync commands, multiple workers. Using a simple in memory bus largely copied from https://github.com/gregoryyoung/m-r/blob/master/SimpleCQRS/FakeBus.cs I am using RavenDB as my read model store.

There's a non trivial chance that two events for the same document come in at the same time on different workers. If you tell it to Raven will use If-None-Match to make sure you are not overwriting any changes on the document and throw an exception instead. Then I just retry until it succeeds.


Or, you can just have EventStore publish to NSB and make sure there is only one instance of a handler for each projection.

MichaelK

unread,
Dec 19, 2012, 1:53:59 PM12/19/12
to ddd...@googlegroups.com, joao...@braganca.name
Maybe I am missing something but with your in-memory bus you don't have any external consumers? I would think your only event listener is your Raven read store. What if another BC needed to subscribe to that event? 

I am in agreement that its the responsibility of the EventStore to publish the events. In that case then if I do sync commands then the command handler (running on website) is only responsible for persisting the event to the event store. Once that is done I can say everything succeeded. Then its up to the event store to publish those pending events whenever it gets around to it. I am not that familiar with JOlivers eventStore/CommonDomain but I don't think it works that way so I was thinking my thought process might be wrong.

Yevhen Bobrov

unread,
Dec 19, 2012, 2:33:45 PM12/19/12
to ddd...@googlegroups.com
Deploy BCs on the same node.
Logical separation  != physical distribution

Sent from mobile device

19 дек. 2012, в 20:53, MichaelK <mko...@gmail.com> написал(а):

João Bragança

unread,
Dec 19, 2012, 4:01:17 PM12/19/12
to ddd...@googlegroups.com
You should do this where you can, but sometimes it's not always possible.

Bennie Kloosteman

unread,
Dec 19, 2012, 10:00:22 PM12/19/12
to ddd...@googlegroups.com
Im using the disruptor model my initial domain tests were 300K/ sec
real commands with real work and its basically limited by the event
store ( for which a cluster wont help) . ( as this is way beyond what
i need its ok and if i need more i can focus on the bottle neck eg
stream it as a file or use partitioned noSQL) . The read side is
cached and invalidated when an event comes.

Its based on Gregs simplest thing , Single threaded ( so no locking
etc) , don't need to deal with complex load balancing , persisting or
poison commands or persistent buses etc . Success and failure is
communicated via callbacks as is persistence. Due to the speed and
async callback the web call can hang around till fail or success in
nearly all cases . Server crash is handled.

Anyway its an experiment for a simple infrastructure solution to a
large domain, i may have to re-engineer. If web server usage
becomes an issue ( eg serving the UI as it wont be the domain cost
) then will just move all the command and read side to 2 SOA services
wIth the command side calling the read side side services with the
new events which invalidate there caches but i doubt this will be
needed.

So going with KISS and the first rule of distributed systems minimize
cross system calls. Once i have a few years in a CQRS system than I
may tackle a more distributed one with confidence without it being a
nightmare . A new architecture is hard enough.

Ben

Vytautas Mackonis

unread,
Dec 20, 2012, 3:10:15 AM12/20/12
to ddd...@googlegroups.com, joao...@braganca.name
"X web servers publish commands via NServiceBus to "Command Service" hosted on another machine."

How about "X web servers send a command via WCF (or any other synchronous channel) to a command service" instead? Does this not make the command processing synchronous?

P. S. JOliver's event store does indeed publish events asynchronously if you use an asynchronous commit dispatcher (which is the preferred option).

Greg Young

unread,
Dec 20, 2012, 3:40:45 AM12/20/12
to ddd...@googlegroups.com
im curious of two things...

1) how you handle 300k http requests/second
2) which event store is limited at 300k/second

also " limited by the event
store ( for which a cluster wont help) " why would clustering an event store not help?

Normally in these situations its the io that limits you. 

Cheeers,

Greg
--
Le doute n'est pas une condition agréable, mais la certitude est absurde.

Bennie Kloosteman

unread,
Dec 20, 2012, 7:50:01 AM12/20/12
to ddd...@googlegroups.com
On Thu, Dec 20, 2012 at 4:40 PM, Greg Young <gregor...@gmail.com> wrote:
> im curious of two things...
>
> 1) how you handle 300k http requests/second

That is a test of the command domain processing ( which include some
query side validation) .. I have got 100K http requests / sec in a
previous projects 8 years ago ( using WCF as a service with basic http
) .. its not hard if you avoid IO and maximize async when you do .
Like i said if the web servers were groaning under the weight of the
IIS/MVC /ASP stack hitting the disk for each request or session then
i would take the static load of with a reverse proxy and if i still
need more then move the domain to another server and do it as an SOA
service ( and if im doing such a huge amount than the business value
in going to SOA should be there - not the other way around) . If all
the server is doing is create command , from 100 bytes of data ( +
html cookie ) , add to disruptor queue and asynchronously wait for a
response i should be able to get way over 100K on modern hardware.

> 2) which event store is limited at 300k/second

Obviously my current SQL Express server gets no where near there :-)
..., the domain keeps chugging away producing calls but eventual the
amount of outstanding work in the event store ( disruptor domain )
gets too long so i get timeout errors on the command. Since i'm
single threaded / single server i do have cached data in repositories
which helps a lot . I put a memory store in to test the domain
properly .


> also " limited by the event
> store ( for which a cluster wont help) " why would clustering an event store
> not help?

Indeed it would ! I meant clustering/splitting the domain , while
introducing complexity it wouldn't help resolve the bottle necks
which are the Web front end (serving other crap) and the DB (event
store) .
>
> Normally in these situations its the io that limits you.

Yep .. but if you process commands with a async threadpool or much
worse sequential workers which are waiting on IO you make it worse
and are not using your DB to the limit. I also asked myself why
persist commands , have cross machine events and massively complicate
the domain with distributed requirements ? Simple single threaded ,
high performance and you have a tight domain , CQRS seems very nice
like this IMHO, but im still learning it and like i said i want to
avoid the complexity of a distributed CQRS system for my first one...
even though i have done plenty of SOA with WS-EVenting . Simple is
best if it does the job.


Ben

Greg Young

unread,
Dec 20, 2012, 8:05:57 AM12/20/12
to ddd...@googlegroups.com
I think your measurements are "off" when you talk about HTTP. Let me guess you were doing 100K/s with keep alives and pipelining which is not a very normal thing to see in real traffic.

Let's just imagine that you are on a reasonably normal network with a MTU of 1500 bytes.

Now let's imagine that we have a request then a response (for the sake of discussion both are smaller than MTU).

So half duplex requests -> 100k * 1500 due to MTU ~~> 1.5 gb/s (150,000,000 throughput)
other hlaf duplex responses -> 100k * 1500 due to MTU ~~> 1.5 gb/s  (150,000,000 throughput)

So now all you need is a pipe thats 3 gb/s?. Note these are not counting Tcp packets for (connects, acks etc).

Now if you were pipelining and using keepalives things might be a touch easier (multiple things per mtu) but real traffic tends to not work this way.


  and the DB (event
store)  .
>
> Normally in these situations its the io that limits you.

I meant that in both situations its io that tends to limit you.


Cheers,

Greg


On Thu, Dec 20, 2012 at 2:50 PM, Bennie Kloosteman <bklo...@gmail.com> wrote:
That is a test of the command domain processing ( which include some
query side validation)  .. I have got 100K http requests  / sec in a
previous projects 8 years ago ( using WCF as a service with basic http
 )  .. its not hard if you avoid IO and maximize  async when you do .
Like i said if the web servers were groaning under the weight of the
IIS/MVC /ASP stack hitting the disk for each request or session  then
i would take the static load of with a reverse proxy and if i still
need more then move  the domain to another server and do it as an SOA
service ( and if  im doing such a huge amount than the business value
in going to SOA should be  there - not the other way around)  . If all
the server is doing  is  create command  , from 100 bytes of data ( +
html cookie )  , add to disruptor  queue and asynchronously wait for a
response i should be able to get way over 100K on modern hardware.



Bennie Kloosteman

unread,
Dec 20, 2012, 9:02:32 AM12/20/12
to ddd...@googlegroups.com
I realize they are bench marks and not real ...but its good for
comparison . If your real figure is 25-50% of your bench mark your
normally doing well or you put a lot of time into your bench marks.

it was a while ago i do remember connections were explicitly closed
though they probably did 30 or so commands to simulate a user stream
of activity ( so yes some piggy backing / streaming) , it was not to
a web server but a WCF service using http binding . Command data
which was 50% was only 100-200 bytes ( on the wire with Ip header )
with similar responses. Which meant 50% of the traffic took about 200
* 100K = 20Mbytes of data on the up and down ( or 160 Mbit) . Most
get queries were similar single records not huge though a few were 3K
, and 4-600 Mbit total is certainly possible on a full Duplex 1G
connection .

Your point is correct , IO is the limit , but my point is good
performance and simplicity ... If IO is the limit , why distribute
the domain if its not the bottle neck ? Or create another IO
bottleneck with more persistence . Ok you may get 10 servers pumping
out 5Gig /Second but until you do and have the network in place
keep it simple.

Ben

Greg Young

unread,
Dec 20, 2012, 9:16:28 AM12/20/12
to ddd...@googlegroups.com
it was a while ago i do remember connections were explicitly closed
though they probably did 30 or so commands to simulate a user stream
of activity ( so yes some piggy backing / streaming)  , it was not to
a web  server  but a WCF  service using http binding  . Command  data
which was 50% was only 100-200 bytes  ( on the wire with Ip header )
with similar responses. Which meant 50% of the traffic took about 200
* 100K = 20Mbytes of  data on the up and down ( or 160 Mbit) . Most
get queries were similar single records not huge though a few were 3K
, and 4-600 Mbit total is certainly possible on a full Duplex 1G
connection .

MTUs.... sending 100k packets with 20 bytes each in them does not = 2mb

Its quite easy to get very high throughput with small commands by putting many in a packet. The problem is this is measuring something very different.

Benchmarking such systems is hard! As an example on my local machine the ES writes > 80MB/s if you stream 1MB messages to it. In practice though events are much smaller than this (and fsync + smaller writes ends up killing you).

The same with our HTTP stuff. Pipelining + combining puts many commands/MTU (can do the same on response). This is not however a normal operation.

With all such systems though I find getting the processing fast is never where the time is spent, its getting the IO to be fast.

Cheers,

Greg

Bennie Kloosteman

unread,
Dec 20, 2012, 10:24:23 AM12/20/12
to ddd...@googlegroups.com
On Thu, Dec 20, 2012 at 10:16 PM, Greg Young <gregor...@gmail.com> wrote:
> it was a while ago i do remember connections were explicitly closed
> though they probably did 30 or so commands to simulate a user stream
> of activity ( so yes some piggy backing / streaming) , it was not to
> a web server but a WCF service using http binding . Command data
> which was 50% was only 100-200 bytes ( on the wire with Ip header )
> with similar responses. Which meant 50% of the traffic took about 200
> * 100K = 20Mbytes of data on the up and down ( or 160 Mbit) . Most
> get queries were similar single records not huge though a few were 3K
> , and 4-600 Mbit total is certainly possible on a full Duplex 1G
> connection .
>
> MTUs.... sending 100k packets with 20 bytes each in them does not = 2mb

MTUs are the maximum size before fragmenting .. 20 bytes is not
possible IP header is 20 , tcp is +20 i think,. 100K packets with 20
bytes data would be worst case ( 100K * 60 ) 6MBytes , 48 Mbit. .
with some control piggy backing for Acks etc . If you send 20 byte
messages to a socket however Nagle will add them to a packet till
the MTU is reached . Those packets i mentioned were that size on the
wire ( though not when running they probably were batched in the 30
transaction stream) .

That said Nagle assumes the messages are send over a socket for HTTP
1.0 however you sometimes get this pattern

Open socket
Send 1 packet
Close socket

For large packets this is not really a throughput issue but a latency
issue ( and a test for the socket management layer ) as each socket
has a connection cost. For small packets it can be very costly

You can use Connection: keep-alive in HTTP 1.1 to prevent this and
its one of the things you will find in asmx web services. Its crucial
so the connections from a client get "pipelined". ( and i think the
30 messages and an explicit close is realistic) .

>
> Its quite easy to get very high throughput with small commands by putting
> many in a packet. The problem is this is measuring something very different.

It can be similar .. Many commands are quite small and it was ok for
that project ( as we were very careful with our packets - besides 300
very active desktops we had 3K vehicle clients whose GSM connection
got downgraded to 9600 baud !) - Agree though with most modern code
and talking to a http server it can be quite different , XML
namespaces , Guids and big strings , serialized complex entities and
you cop the cookie + the request and response header . In that
project we just signed the packets with a small 9 digit hash so no
cookie needed.
>
> Benchmarking such systems is hard! As an example on my local machine the ES
> writes > 80MB/s if you stream 1MB messages to it. In practice though events
> are much smaller than this (and fsync + smaller writes ends up killing you).

Agree its hard.. And the test i mentioned was a quick and dirty , had
a requirement for 3K / sec so a spend a few days to see what i could
get..

The really nice thing about an event store though is its just an array
of events , speaking of which if you had an async api with a timeout
couldnt you batch up all the requests from a machine ( in the client
API) and batch them. You dont return to the caller unless the timeout
has happened or success or failure similar to a file op .. I suppose
the sync users / processors would suffer from the extra latency
though.

BTW have you had GC pause issues , or is all the heavy lifting native ?

> The same with our HTTP stuff. Pipelining + combining puts many commands/MTU
> (can do the same on response). This is not however a normal operation.
>
> With all such systems though I find getting the processing fast is never
> where the time is spent, its getting the IO to be fast.

True which is why i have a persists as rarely as possible policy,
but note if you have all async IO ( which a Lmax style architecture
does have even if a single thread) , then you have much greater
opportunity for batching IO .. For full async it means very little to
throughput if the disk returns in 3 ms or 100ms ( which would kill
sequential workers) but there is much greater scope for IO
batching. If your trying to get 10K transactions per second 100ms
would be a 1K batch. .. Works especially nice with commands which
assume success and callback on failure .. Though you would need to
sort out the aggregates by type for some types of storage.

Ben

Greg Young

unread,
Dec 20, 2012, 10:35:58 AM12/20/12
to ddd...@googlegroups.com
"You can use Connection: keep-alive  in HTTP 1.1 to prevent this and
its one of the things you will find in asmx web services. Its crucial
so the connections from a client get "pipelined".  ( and i think the
30 messages and an explicit close is  realistic) ."

Actually keep alive is done by default in 1.1 you need to specify in 1.0.


"The really nice thing about an event store though is its just an array
of events , speaking of which if you had an async api with a timeout
couldnt you batch up all the requests from a machine ( in the client
API) and batch them.  You dont return to the caller unless the timeout
has happened or success or failure similar to  a file op ..  I suppose
the sync users / processors would suffer  from the extra latency
though."

You can do it on a lower level. We do this in a couple of places. 

1) when actually building packets to send you can do nagle (yes you pay a small penalty for slow traffic)
2) we when writing to disk check to see if our queue is empty and delay flushing for durability if its not up to a few ms (depending on hdd speed). This helps increase throughput via batching at times of high load

"BTW have you had GC pause  issues , or is all the heavy lifting native ?"

We have struggled to keep our managed heaps tiny (like 100mb to avoid too many GC issues). Most of our large stuff we do memory wise is in unmanaged memory

"True  which is why i have a persists as rarely as possible  policy,
but note if you have all async IO ( which a Lmax style architecture
does have even if a single thread)  , then you have much greater
opportunity for batching IO ..  For full async it means very little to
throughput if the disk returns in 3 ms or 100ms ( which would kill
sequential workers)    but there is much greater scope for IO
batching.  If your trying to get 10K transactions per second  100ms
would be a 1K batch. .. Works especially nice  with commands which
assume success and callback on failure ..  Though you would need to
sort out the aggregates by type for some types of storage."

Yes this is basically how we work internally. The place you have to be careful is in failovers.

On Thu, Dec 20, 2012 at 5:24 PM, Bennie Kloosteman <bklo...@gmail.com> wrote:
You can use Connection: keep-alive  in HTTP 1.1 to prevent this and
its one of the things you will find in asmx web services. Its crucial
so the connections from a client get "pipelined".  ( and i think the
30 messages and an explicit close is  realistic) .



MichaelK

unread,
Dec 20, 2012, 10:59:19 AM12/20/12
to ddd...@googlegroups.com, joao...@braganca.name
I guess I am getting hung up on the Domain and how that works with event sourcing. Below are the steps I see happening...

  1. A Command is sent
  2. A single command handler handles the command
  3. CommandHandler loads up existing events from Event Store for given AggRoot Id
  4. Replay those events to hydrate AggRoot
  5. Method on AggRoot is called and this is where all business rules reside. (This could fail so that is why I was thinking sync commands)
  6. Events get generated out from AggRoot
  7. Save those Events to Event Store
  8. Publish out Events to any listeners
In a Sync model I need to complete step 5 at a minimum to make sure the command succeeds. Ideally I would think I should get to Step 7. I would want Step 8 to happen from somewhere else, not from the web servers. So in my mind I have the below process (shows where execution happens)

  1. A Command is sent (web application)
  2. A single command handler handles the command (web application)
  3. CommandHandler loads up existing events from Event Store for given AggRoot Id (web application)
  4. Replay those events to hydrate AggRoot (web application)
  5. Method on AggRoot is called and this is where all business rules reside. (web application)
  6. Events get generated out from AggRoot (web application)
  7. Send those events to EventStore (web application)
  8. Save those events to EventStore (event store)
  9. Publish out Events to any listeners ideally async (event store)
The new Step 7 above is the key where I send them to the EventStore that I guess I need to expose as an endpoint (SOA) for anyone to use. 

Is this a valid approach that others have used? 

Greg Young

unread,
Dec 20, 2012, 11:00:57 AM12/20/12
to ddd...@googlegroups.com, joao...@braganca.name
You respond to command after save to event store.

sebastian

unread,
Dec 20, 2012, 1:15:01 PM12/20/12
to ddd...@googlegroups.com
Until step 7 is complete, you can't really say the command has been processed.

Waiting for 8 to complete can be really nice, in that you know also that your read models are now consistent with your command. Cleans a lot of things up, but of course may not be appropriate at all depending on your design goals. If you aren't in a single database to rule them all, you probably can't wait for 8 to finish.

Reply all
Reply to author
Forward
0 new messages