Tech Question - ZeroMQ and event replication

1,184 views
Skip to first unread message

Rinat Abdullin

unread,
Nov 30, 2011, 12:15:12 AM11/30/11
to ddd...@googlegroups.com
Hi all,

Has anybody used ZeroMQ sockets for event replication in dynamic scenarios?

Consider the case when there is a single master and slaves that can come and go.

How do you implement scenario, where slave (i.e. projection host or another subsystem) comes up after a shutdown and is interested in getting history first and then getting all new events in realtime? To be precise, how do you switch publisher from feeding the history (and advancing pointer) to passing through real events?

Occasional message duplication is fine.

// Гриша, я знаю, что это будет в Белке. Но мне это нужно вчера))

Regards,
Rinat

Tom Janssens

unread,
Nov 30, 2011, 4:32:41 AM11/30/11
to ddd...@googlegroups.com
Rinat,

Interesting question. Here is a possible (unverified/untested) implementation: https://github.com/mkoppanen/pzq/wiki/An-Introduction-To-PZQ

Tom

Rinat Abdullin

unread,
Nov 30, 2011, 4:36:57 AM11/30/11
to ddd...@googlegroups.com
Tom, thank you!

PZQ reference seems to be exactly what I needed. Will have a deeper look tonight.

Thanks again

Rinat

kell.s...@gmail.com

unread,
Nov 30, 2011, 9:20:59 AM11/30/11
to ddd...@googlegroups.com
Rinat,

Is reliability important in this scenario? I treat things like MSMQ/RabbitMQ as total opposites from ZeroMQ. I use ZeroMQ when fast is more important than reliability because it's not a reliable messaging solution.

In our system we are using both reliable and unreliable messaging. Similar to the concept of separating reads and writes we optimize the communications choice based on the needs.

Look forward to hearing about your ZeroMQ experiences :)

Later,
Kell

Rinat Abdullin

unread,
Nov 30, 2011, 10:26:00 AM11/30/11
to ddd...@googlegroups.com
Hi Kell,

Reliability and simplicity are extremely important in our scenarios (with various degrees of importance in different parts, of course). 

I think it is possible to achieve both of these by eventually switching to ZeroMQ-like solution (and discarding central brokers and any middle-ware). Reliability here could be achieved not at the individual node level but rather at redundancy and collaborative behaviors of individual (extremely simple) nodes.

Obviously currently it is just a gut feeling, that is based on my limited experience. We'll see how it goes.

NB: We are currently using a mix of reliable messaging (custom Azure queues binding and file queues implementation) along with in-memory messaging. I'd love to replace these 3 technologies with just 2 :)


Best regards,
Rinat

Rinat Abdullin

unread,
Dec 1, 2011, 3:32:59 AM12/1/11
to ddd...@googlegroups.com
FYI, dumped thoughts into a blog post: http://abdullin.com/journal/2011/12/1/store-and-forward-for-event-streams.html

Best,
Rinat

Tom Janssens

unread,
Dec 1, 2011, 4:10:32 AM12/1/11
to ddd...@googlegroups.com
Rinat,

I applied something similar in Scritchy, no need for something like 0mq; it is pretty straightforward and looks like this:

public class Eventstore
{
   StorageAdapter storageAdapter; // could be replaced with a simple filestream/....; sharding should also be a piece of cake

   void SaveEvents(IEnumerable<object> messages)
   {
      // I use an adapter, but you could simple envelope and stream to disk (append-only storage)
      foreach (var msg in messages)
        storageAdapter.Save(new Envelope(message)); 
   }

   
   Dictionary<object,long> EnumeratorContext = new Dictionary<object,long>();

   IEnumerable<object> GetNewEvents(object context)
   {
     if (!EnumeratorContext.Contains(context))
       EnumeratorContext.Add(context,0)
     
     while (!EnumeratorContext[context]< Storage.Count)
     {
        yield return storageAdapter.FindAllFrom([EnumeratorContext[context]++).Message;
     }
   }
}

If you want to, you could add a method to set the context index as well... 

(The actual StorageAdapter implementation can query events per ARId as well, but I assume you do not need that option.)


Tom

Rinat Abdullin

unread,
Dec 1, 2011, 4:15:25 AM12/1/11
to ddd...@googlegroups.com
Tom,

Do you replicate events across different machines (and/or availability zones)? If so, what throughput do you usually get?

Best,
Rinat

Tom Janssens

unread,
Dec 1, 2011, 4:45:34 AM12/1/11
to ddd...@googlegroups.com
Rinat,

Most apps I deploy are for a limited amount of concurrent users, so I do not have any experience at all in that part. (I am not into CQRS because of the high availability).
As for throughput, I would suggest by intuition to start experimenting by grouping messages, i.e. 

 object[] GetNewEventsGrouped(object context,int maxcount)

Another simple idea with this approach might be sharding in the storage adapter (no experience; pure intuition):

class StorageAdapterSharder()
{
    IStorageAdapter[] Shards;
    int LRU = 0; // last recently used shard; this would need to be persisted somewhere

   public StorageAdapterSharder(params IStorageAdapter[] Shards )
   {
      this.Shards = Shards;
   }

   void SaveEvents(IEnumerable<object> messages)
   {
      foreach (var msg in messages)
      {
        Shards[LRU].Save(message);  // if possible fire & forget
        LRU++;
        LRU%=Shards.Length;
      }
   }

   IEnumerable<Envelope> FindAllFrom(long pos)
   {
        yield return Shards[pos % Shards.Length].FindAllFrom(pos/Shards.Length).Message;
   }
}

Duplication could also be done with a wrapper; nice, simple and easy IMO...

Tom

Tom Janssens

unread,
Dec 1, 2011, 4:50:16 AM12/1/11
to ddd...@googlegroups.com
Rinat,

My apologies, I fubarred on the sharding code (copy paste FTW - not !); here is my second attempt.

class StorageAdapterSharder
{
    IStorageAdapter[] Shards;
    int LRU = 0; // last recently used shard; this would need to be persisted somewhere

   public StorageAdapterSharder(params IStorageAdapter[] Shards )
   {
      this.Shards = Shards;
   }

   void Save(Envelope envelope)
   {
        Shards[LRU].Save(envelope);  // if possible fire & forget
        LRU++;
        LRU%=Shards.Length;
   }

   IEnumerable<Envelope> FindAllFrom(long pos)
   {
        yield return Shards[pos % Shards.Length].FindAllFrom(pos/Shards.Length).Message;
   }
}

Pretty simple IMO.

You could also try to create envelopes with multiple messages using a similar wrapper.

Tom

Tom Janssens

unread,
Dec 1, 2011, 5:02:02 AM12/1/11
to ddd...@googlegroups.com
OMG ! Of course that last one should say:

 IEnumerable<Envelope> FindAllFrom(long pos)
 {
        var Selects = Shards.Select((x,i)=>x.FindAllFrom((pos-i)/Shards.Length).ToArray();
        pos%=Shards.Length;
        while (Selects[pos].MoveNext())
        {
           yield return Selects[pos].Current;
           pos++;
           pos%=Shards.Length;
        }
   }

Rinat Abdullin

unread,
Dec 1, 2011, 5:18:53 AM12/1/11
to ddd...@googlegroups.com
Tom, 

thanks a lot for the feedback.

Message grouping (esp events that are published from an AR) and partitioning (esp. for aggregate roots) are already in place. Actually, ability to partition is the key scaling factor here. This is a nice side benefit coming from the very nature of domain models.

The problem I'm getting here is my desire to have a single unified domain log (plus simplify message processing in LMAX style). Throughput becomes really important. 

NB: domain log is just a unified stream of events (copy of all events) that is used to rebuild projections, perform ad-hoc queries, resend messages and do all the other little things that in theory should never happen, but in practice happen all the time.

The most interesting part is that - this domain event log it is not that important for production (sometimes lagging by an hour :) However this latency significantly increases friction for development and maintenance. We are just learning how to integrate our systems together, sending quite a lot of messages back and forth etc. 

NB2: Eventually I will probably go and partition/shard the log as well, but before that it should be possible to easily get throughput of at least 1000 messages per second on a single event stream in Windows Azure.

Best regards,
Rinat

Tom Janssens

unread,
Dec 1, 2011, 5:32:46 AM12/1/11
to ddd...@googlegroups.com
Hey Rinat,

I now understand what your intent is; LMAX also uses sharding if I interpreted them correctly (multiple writers). 

Tom

PS: coding in a text browser window is a mess, so I implemented a simple sharder in Scritchy: https://github.com/ToJans/Scritchy/blob/master/Scritchy.Infrastructure/Implementations/EventStorage/Adapters/ShardedAdapter.cs

Rinat Abdullin

unread,
Dec 1, 2011, 5:37:47 AM12/1/11
to ddd...@googlegroups.com
AFAIK, LMAX had 1 writer and multiple readers (which could be sharded). But I could be wrong here.

Even if LMAX does have multiple writers I wouldn't bother with that approach. Rule of thumb "do not share writes to the same entity" (unless they come through one queue which is managed by 1 thread) saves a lot of head ache.

Slightly different approach is to tell writer A to write separately messages with hash remainder of 0, B - 1, C - 2 etc. But this means separate entities to be written to.

Best,
Rinat

Tom Janssens

unread,
Dec 1, 2011, 7:21:52 AM12/1/11
to ddd...@googlegroups.com
Hey Rinat,

AFAIK in LMAX, can have multiple persisters(=writers) who read the messages from the ringbuffer and write them to the disk. (i.e. basic sharding)...
1 persister => store all messages
2 persisters => store every second message (so write one/skip one), offset by the number of the persister instance
3 persisters => store every third message (so write one, skip 2), offset by the number of the persister instance
...
N persisters => store every Nth message (so write one, skip N-1)

This is exactly the thing that makes your throughput virtually unlimited as I understood it... Which would probably solve your problem ?
You could also add persisters for mirroring/parity checking etc, as they are virtually free...

Then again, you are way more experienced in this domain, so I should probably stop guesstimating and just listen to you ;) .

Tom




Rinat Abdullin

unread,
Dec 1, 2011, 7:41:44 AM12/1/11
to ddd...@googlegroups.com
The logical problem with multiple persisters - what's the point in having them on a single ring buffer, while you can just write to separate partitioned buffers? As you've mentioned, readers are virtually free here.

But I admit that I could be quite wrong myself :)

BTW, just tried a quick prototype. I'm getting 1000 messages per second on a single stream (flushing to disk after each write). Production experience could be faster since I'm using TCP connection between nodes in a code snippet in unit test that runs from R# runner in VS in a VM with 1.5 GB RAM on MacBook Air :)

If I would need to go above that - either add in-memory store-and-forward system (with guaranteed write to N slaves) plus go your way of partitioning streams between multiple writers.

Best,
Rinat 

Kelly Sommers

unread,
Dec 1, 2011, 9:42:53 AM12/1/11
to ddd...@googlegroups.com
Rinat,

Could you describe where you see the bottleneck that is stopping you from getting the throughput you desire? Without that it's hard to solve other than throwing random performance optimizations and distributed options which I'm about to do ;)

I may have missed it so my apologies if I did.

Is this an Azure storage throughput bottleneck you are getting where your domain is performing in production at a higher rate than a single persistence mechanism can keep up?

I find performance, reliability and consistency to be trade off's of each other. If you do things like in memory queue of events then write in large blocks you risk reliability and consistency. If you write per event you lose performance.

You could do single producer into a RingBuffer and multiple consumers who shard your event stores and then your UI diagnostics tool would simply do a fan-out query to report on your events.

What about Azure Service Bus? That might be another option for pushing a lot of events out in mass scale for consumption to persist.

Another thing to keep your eye on is Azure recently announced Hadoop support. Hadoop is built on HBase and similar to Cassandra is append-only operations meaning it's a write-optimized store which I think fits perfectly for event storage.

In my limited experience with Cassandra (also write-optimized append only) even on ARM embedded servers the write operations are quite impressive.

Later,
Kell

Rinat Abdullin

unread,
Dec 1, 2011, 10:30:36 AM12/1/11
to ddd...@googlegroups.com
Hi Kelly,

My fault for not explaining it clearly :)

Essentially I've been trying to optimize on multiple dimensions at once. Not only reliability, latency and consistency, but also less measurable things like simplicity of maintenance and actual development. 

Currently in our production systems Azure queues are the primary building block. They are consistent and durable, but slow for our new needs. Simple forwarding operation provides at most 100 mps on a single thread. While developing systems (or running them off the cloud) I'm currently using dead-simple file queues (which provide higher throughput).

The core point of my current search is to figure out the simplest possible building block that would be performant and universal enough for non-critical scenarios. Persistent, with decent throughput and without any servers to deploy. Something that would fit 95% of current needs without any tuning (and would meet 80% of our needs for the next 3 months)

Simplicity approach obviously ruled out Azure Service Bus, Hadoop, AMQP etc (plus I don't inherent architecture fragility that is brought by central brokers, although if complexity costs are justified they could fit into some scenarios). After playing with sockets and a few lines of code I've got something that should work nicely - sockets plus files. Obviously, there are additional parameters like risks, latency that can be tuned individually (i.e. by organizing different types of event stores in different topologies)

Obviously, this is not "one size fits all solution" (there ain't one :) but this approach seems to fit our project portfolio better than existing one (around Azure queues for Azure deployment and file queues) as we go deeper into the event centric world in specific projects.

Only production will tell if it actually works out :)

Best regards,
Rinat

Tom Janssens

unread,
Dec 1, 2011, 11:59:45 AM12/1/11
to ddd...@googlegroups.com
Hey Rinat,

You triggered my interest; I implemented a small test app with Scritchy to test eventstore throughput using raw file storage/protobuf. This was the result:
(Messages are submitted for at a time, so a flush happens every 4 messages; no optimization at all):

Generate events first? (Y/N)
y
Time to store 4093664 messages (612,931488037109 MB) : 60000,4318 msecs, 
msgs/sec = 68227,2423246127, MB/sec = 10,2154512834208

Reading back messages
Time to read 4093664 messages : 81044,6355 msecs, msgs/sec = 50511,2272360087

Oddly enough, reading is slower, even though I read only half the amount of bytes I write in this particular case... However, I get 60 times the speed you get ?

Am I missing something here, or is there a problem with your implementation ?

If you want to verify: create a new console app, use nuget to add Scritchy (make sure you ref the latest 0.1.0.15), and copy/paste the code from this gist:
https://gist.github.com/1418094

Tom

Rinat Abdullin

unread,
Dec 1, 2011, 12:18:28 PM12/1/11
to ddd...@googlegroups.com
Hi Tom,

I'm running all that through two network sockets and sequentially (no batching)

There is a message sender that sends message (via TCP socket) to the event-store and awaits for the response in order to send the next one. Then there is an event store that records messages to the disk, flushes everything, responds to the sender (via TCP) and also publishes message to all the subscribers (via TCP). I'm actually measuring throughput that shows up at the subscriber.

Each of these is separated by a socket and runs in a separate thread (inside a small VM). All this adds a bit of latency :)

This is actually similar to how I plan to use that approach in production - sprinkle code with these light daemons (well, they've got to be called "spirits" at this point) while adding redundancy, partitioning and a bit of self-healing and load-balancing. Throughput of each individual building block might not be great, but when we are talking about dozens of processes running on hundreds of VMs in the cloud, that would add up a bit (and enable elastic throughput scaling that happens proportionally with scaling of the system).

Here is the quick test code snippet: https://gist.github.com/819f7ec8749d63568fce

I must admit that I've discovered sockets just a few days ago, so my code is probably also wasting a lot of time on some stupid things.

Best,
Rinat

Tom Janssens

unread,
Dec 2, 2011, 6:52:36 AM12/2/11
to ddd...@googlegroups.com
Hey Rinat,

The spirits sound like an interesting approach... Here's an idea, although it will probably be to complicated:

If a spirit is under heavy load, do a multicast, requesting spirits with duplicate functionality that do not have much load. When you find one, morph it into the one that you need...
(And add a chaos monkey as well, so you are sure everything works as expected.)

On the other hand, you might just as well contact google and ask them to license their tech ;)

Tom



Rinat Abdullin

unread,
Dec 2, 2011, 7:05:19 AM12/2/11
to ddd...@googlegroups.com
Hi Tom,

We're doing slightly simpler approach, the brute force way. In short - if system is under load (queues get packed), we just provision more workers (virtual machines) to help deal with the work. Properly designed systems can handle 250x increase easily. Less thorough systems do only 15x increase (or much less).

Once the work has been dealt with - de-provision the excessive workers (and pay only for the hours that have been used - that's the beauty of the cloud computing :)

However, one of the primary bottlenecks of this approach is that when you ask for more workforce, it also increases the overall amount of messages that fly through the system (starting to kill primary queues, brokers, IO etc).

The idea with these small socket-driven spirits is to have everything constructed in a way, that when new workforce comes in, it also brings in more infrastructure capacities with itself, increasing in total throughput, redundancy and reliability. Somewhat similar to p2p networks (but dead-simple in topology). Sounds crazy, I know :)

Best,
Rinat

kell.s...@gmail.com

unread,
Dec 2, 2011, 12:18:24 PM12/2/11
to ddd...@googlegroups.com
Azure Service Bus sounds like a natural fit here when hosted on Azure. Some people are publishing an absolutely massive amount of messages through it so scaling of messages shouldn't be a problem there. Much better in throughput and scale than Azure Queues from my understanding. 

For my Continuous Client demo I wired up Azure SB manually in 2 hours. Using the SDK likely even faster :)

Later,
Kell

Rinat Abdullin

unread,
Dec 2, 2011, 12:40:36 PM12/2/11
to ddd...@googlegroups.com
Kelly,

Using Azure SB will break the technology golden rule by increasing number of technologies the project depends upon :)

While by using spirits I can actually reduce the number of tech (replacing already used in-memory and file-based queues with sockets) at the same time with improving throughput. Besides this will bring closer the experience of Rackspace and Azure deployments (allowing to have same maintenance tools).

Sounds like an easy choice for me :)

Best,
Rinat

Kelly Sommers

unread,
Dec 2, 2011, 12:54:59 PM12/2/11
to ddd...@googlegroups.com
Rinat,

I wasn't aware of your rackspace hosting. In that case I entirely agree if you are looking for a single pubsub mechanism in both environments.

As far as the golden rule, I don't see swapping out communications as a big dependency because it should be defined in the boundaries of your system. 

These choices are always a trade off. What you don't use that someone else maintains you now carry the burden to maintain and build.

I choose not to re-invent the wheel and write my own communications because I can swap them out easily, there are tons of choices that are already proven. No biggie IMO.

I see it similar to how CQRS+ES treats the DB as just an implementation detail that can be swapped for another implementation since the schema is so simple. If it's loosely coupled and only takes a day or two to write another implementation it's not that big of a dependency.

I think Greg's pipes library (is that the correct name?) is well suited for hooking up to these kinds of things.

Later,
Kell

Rinat Abdullin

unread,
Dec 2, 2011, 1:21:42 PM12/2/11
to ddd...@googlegroups.com
Kelly,

Well, Rackspace was just a coincidence. Initially we were not even targeting it. However since non-Azure deployment is such a simple thing, we just ended up using RS for projects with the most frequent release schedule.

Re communications: sockets or TCP/IP stack I will never write on my own (hopefully :) However if we are speaking of the higher level... If all needed ASB functionality can be replaced with a few hundred of lines of code, and will be tailored for our needs, desire to have low friction and hybrid deployments, then I'd give it a try. Even if I fail - there will be enough learned to use ASB or AMQP consciously.

This process is similar to how we got rid of SQL/ORM/NH (to a tremendous benefits) and are getting rid of a lot of frameworks and IoC/DI (same here). I think, simplicity is the most undervalued feature - a powerful enabler (especially for dev teams of a few people)

Obviously, these decisions involve a lot of tradeoffs, discussing which would not fit in a single post.

Another reason to avoid ASB and try to get rid of Azure Queues is just a gut feeling that these solutions do not match our immediate and future needs. I could be really wrong here as well (not the first time :) Besides, if really pressed, I should be able to replace "spirit" blocks with adapters to Greg's Event Store (which will be precisely optimized for similar scenarios and allow to get further perf boost on any cloud).

Re PVC Pipes: yes, essentially we are moving in parallel direction, where handlers are just lambdas that could be chained. Simplest possible thing that works.

PS: ideally I want to be able to run most CPU-expensive parts on Mono (or even non .NET, since there are capable resources around), just to save on cloud costs per hour -  that affects the tech choices as well. 

Hope this explains a bit my weird choice of tech decisions - trying to jiggle many balls at once.

Best,
Rinat

Kelly Sommers

unread,
Dec 2, 2011, 5:52:38 PM12/2/11
to ddd...@googlegroups.com
Rinat,

Sounds fun! I am also striving to get much better at simplicity.

Have you ever thought about Linode? I can't speak for it myself since I haven't used it but I know a lot of the Mono people from my observation. I believe it's very cheap Linux virtual machine hosting. 

I enjoy debating these topics, I learn a great deal.

Later,
Kell

Tim Gebhardt

unread,
Dec 2, 2011, 11:53:04 PM12/2/11
to ddd...@googlegroups.com
The LMAX guys rely on their messaging middleware for I/O.  They use 29West, which is a decentralized messaging system sort-of-analogous to 0mq.  They basically need only a single "connection" to their 29west infrastructure for input and one for output because the 29west stuff is heavily optimized itself to do the multiplexing of the connections and batching/windowing the data for the network.

Each Disruptor is a FIFO circular buffer, so a common setup is to have a Disruptor that handles reading messages, copying them to an audit log or hot backup system, deserialize the messages and then run them through your business logic processor.  Then the BLP puts its messages on the output disruptor that is responsible for outputting the messages.

But since a Disruptor is a FIFO structure it doesn't really make sense to shard the input.  You'd need one Disruptor to input all messages then have an event processor that shards them to N different Disruptors.  But if latency is your #1 concern then Disruptor really works best on a system that has as many or more CPU cores as event processors because one optimization is for the Disruptor event processors to busy spin while waiting for messages so the event processor threads don't give up their time slice.

You'd be surprised how little you can get away with.  Disruptor has an optimization to batch messages if an event processor falls behind.  Combine this with the fact that with this type of system architecture you're basically writing to append-only logs and a RAID 1+0 has enough capacity to handle more streaming writes than your network card can deliver input data.


Tim Gebhardt

Rinat Abdullin

unread,
Dec 3, 2011, 12:56:30 AM12/3/11
to ddd...@googlegroups.com
Kelly,

Just checked Linode. Based on the pricing, it looks like their Linux hosting is on par with Rackspace for equivalent instances. However Rackspace has smaller 256MB Linux instances (for 10 USD per month) and Windows VMs. 

Plus you pay on hourly basis with Rackspace (as opposed to paying for a month upfront). The latter, for instance, means that you can grab 4GB RAM linux server, compile Mono there and then downsize it to 256 RAM to actually play with it for a few hours, killing it afterwards. Overall cost could be less than a dollar :)

This price flexibility starts paying off in various production scenarios.

However, Linode might have other compelling benefits that I'm not aware off.

Best,
Rinat

Rinat Abdullin

unread,
Dec 3, 2011, 2:10:44 AM12/3/11
to ddd...@googlegroups.com
Tim,

Thank you for clarification and explanations.

Indeed, batching is one of the nicest ideas I like about LMAX approach. Really simple and yet efficient. 

BTW, tried googling up 29 west. It's surprising but web site of high-availability enterprise product features IIS errors from time to time :)

Best,
Rinat

Tom Janssens

unread,
Dec 3, 2011, 4:45:40 AM12/3/11
to ddd...@googlegroups.com
Hey Tim,

Once again, great post ! (Still would love to see the blog post ;). So, you are saying one should shard in hardware (i.e. RAID), that makes a lot of sense: just get faster HW as dev cost is usually more expensive then HW.

thanks!

Tom

This message was typed on a mobile

Tim Gebhardt

unread,
Dec 8, 2011, 11:21:27 PM12/8/11
to ddd...@googlegroups.com
Hey Tom, sorry for the late reply.

It's not even that the sharding is necessary if your message rates can keep up:
A gigabit ethernet interface has a theoretical sustained write rate of 125 megabytes per second:

That's a LOT of data.  I work for a financial brokerage where we have a lot of incoming and outgoing customer market data (with a lot of duplication included) and the last time I looked at our firewall logs I think our external interface had an average sustained data rate of about 70 MB/s.

But even the most reasonable enterprise-class hard drives can keep up with that sustained write rate (making the assumption that your input and output messaging rates are about the same).

Spinning rust hard disks are pretty fast at streaming reads and writes, it's the seeks the kill you.  But if you're doing event sourcing then you're just streaming and some pretty reasonable hard ware should be able to keep up.

Just food for thought.


Tim

Tom Janssens

unread,
Dec 9, 2011, 2:00:07 PM12/9/11
to ddd...@googlegroups.com
Hey Tim,

Thanks for the reply... A test here with Scritchy (completely unoptimized code) on my local machine with a consumer disk, simple file storage and no storage preallocation already provided me 10MB/sec, so I guess faster hardware might indeed be a more efficient way to tackle the problem... I am curious what Rinat's opinion is on this, since he is using cloud instances everywhere, where you typically are not in control of the hardware...

Maybe a possible solution to Rinat's problem might be hosting his own server as the primary queue/publisher, and delegate everything else to cloud instances ?

Tom

Rinat Abdullin

unread,
Dec 9, 2011, 2:54:28 PM12/9/11
to ddd...@googlegroups.com
Hi guys,

thank you for the details and thorough replies.

Tailored hardware is indeed solution for cases when you need to max out throughput of a single node (with low max latency).

However in my case, actual domain model can be partitioned really well. It is easier (and more cost-effective) to use a lot of commodity cloud instances with decent characteristics (elastically scaling out and killing machines as needed) and simple technology to run on top. Each worker node might have average specs, but:

1) combined throughput of a system will be as high as needed (to a certain limit), plus  it usually takes less than 15 minutes to request additional cloud machines;
2) no upfront costs for hardware and we can close customer case without leaving any servers behind (paying only for the [hours]x[instances]);
3) when customer wants to start new project with orders of magnitude more data than initially planned - we generally can handle this.

So in essence, our approaches are optimized so far for cheapness, simplicity and elastic scalability. Latency and throughput-per-thread might suffer a bit, but these are not that important.

Best,
Rinat

Udi Dahan

unread,
Dec 10, 2011, 10:12:05 AM12/10/11
to DDD/CQRS
On the whole gigabit ethernet thing - most developers aren't coding at
the level of ethernet.
TCP is usually as low as people go, and that gets around 40%
utilization of the underlying ethernet (at best), so you're at 50MBps.

Then there's the whole issue of how you serialize your payloads.
XML serialization can be an order of magnitude of bloat on the
underlying data, so you could be effectively at 5MBps.

Not to say that everyone's doing XML serialization.

Still, whether it's 5 or 50, that's for all your users and all their
data.
Most people don't think about scaling at the network level (because of
the prefix "giga" which is translated to "so much I don't even need to
think about it").

Some more food for thought :-)

Udi


On Dec 9, 6:21 am, Tim Gebhardt <t...@gebhardtcomputing.com> wrote:
> Hey Tom, sorry for the late reply.
>
> It's not even that the sharding is necessary if your message rates can keep
> up:
> A gigabit ethernet interface has a theoretical sustained write rate of 125
> megabytes per second:http://www.google.com/search?q=1000+Mb/s+in+MB/s
>
> That's a LOT of data.  I work for a financial brokerage where we have a lot
> of incoming and outgoing customer market data (with a lot of duplication
> included) and the last time I looked at our firewall logs I think our
> external interface had an average sustained data rate of about 70 MB/s.
>
> But even the most reasonable enterprise-class hard drives can keep up with
> that sustained write rate (making the assumption that your input and output

> messaging rates are about the same).http://www.tomshardware.com/charts/enterprise-hard-drive-charts-2010/...

Kelly Sommers

unread,
Dec 10, 2011, 1:26:11 PM12/10/11
to ddd...@googlegroups.com
Excellent points! I really enjoyed reading that. Thanks for chiming in :)

I've been working a lot on the serialization portion.

Later,
Kell

Simon Timms

unread,
Dec 10, 2011, 3:12:26 PM12/10/11
to ddd...@googlegroups.com
What have you discovered, Kelly? My impression is that protocol buffers are the most efficient but at you give up human readability to get that. You also need to define the protocol format for each message manually. 

Rinat Abdullin

unread,
Dec 12, 2011, 1:06:19 AM12/12/11
to ddd...@googlegroups.com
Simon, 

JSON/JSV/XML etc are not human-readable on their own as well. You still use a tool (notepad, vim etc) to read them. So you can as well write a few lines of code to present binary-encoded messages in text format for debugging (I use ToString and also just JSON rendering)

Rinat

Rinat Abdullin

unread,
Dec 18, 2011, 8:36:00 AM12/18/11
to ddd...@googlegroups.com
FYI: after a little bit of playing with ZMQ I got something like 20000 messages per second across TCP in durable mode. Used my old desktop with rust drive for that.


Out of sheer curiosity. Does anybody know a simple way to write directly to disk from .NET (avoiding the file system)?

Best,
Rinat


Caleb Vear

unread,
Dec 18, 2011, 6:19:37 PM12/18/11
to ddd...@googlegroups.com
Rinat,

I think you may be able to have low level read/write access to a partition using the Win32 file functions http://support.microsoft.com/kb/100027.  Once you get the handle to the disk you can probably use this FileStream constructor to create a stream for accessing the disk.  I've done something similar before for comms with USB HID devices, but never actually tried it for a hard disk.

Caleb

Rinat Abdullin

unread,
Dec 18, 2011, 11:13:50 PM12/18/11
to ddd...@googlegroups.com
Caleb, thank you for the suggestion! Will try this later.

Best,
Rinat

Rinat Abdullin

unread,
Apr 5, 2012, 6:01:27 AM4/5/12
to ddd...@googlegroups.com
Post-mortem update: partial event replication is in production (with push style) across multiple datacenters (Azure, Rackspace, OVH) and really saves the day.

Last night we've got file corruption in event stream of a system running on Windows Azure (stream stored in Azure blob storage). Fortunately, since we are replicating event streams everywhere, problem was solved without loosing data (just grabbed stream replicate which was in absolute sync).

Currently deploying our first replicator service (ZeroMQ + Blob Event Streams) to Windows Azure. So far so good.

In short: if you are doing cloud or just a distributed system in volatile environments, then ES provides a simple solution to deal with a lot of risks and problems. Proper DDD reduces friction and increases chances of success here.

Best,
Rinat

Chris Martin

unread,
Apr 5, 2012, 6:13:29 AM4/5/12
to ddd...@googlegroups.com

Perfect timing, Rinat!

We had a very important stream go corrupt yesterday. This would have been nice then. Instead, I rebuilt the stream to version-1, to another container and overwrote.

I'm moving to sample project thus weekend. :)

Tom Janssens

unread,
Apr 5, 2012, 6:43:14 AM4/5/12
to ddd...@googlegroups.com
Could you elaborate on the distribution topology a bit more ? (maybe in a blog post?)
- Do you use a single master ?
- Do your replication services run locally and/or on a cloud instance
- ...

etc..

Thanks!


Op donderdag 5 april 2012 12:01:27 UTC+2 schreef Rinat Abdullin het volgende:

Rinat Abdullin

unread,
Apr 5, 2012, 6:51:00 AM4/5/12
to ddd...@googlegroups.com
Chris, Good luck with that! :)

I'm currently still struggling with ZMQ on Azure (some native dependencies are missing), but I think we are almost through. Don't hesitate to ping me if you face similar problems.


Best,
Rinat

Rinat Abdullin

unread,
Apr 5, 2012, 7:12:34 AM4/5/12
to ddd...@googlegroups.com
Tom,

It's too early to write a post, because topology is currently not fixed. I'm adjusting it on daily basis, as more systems get proper ES support and/or are brought to CQRS world. In a sense, it is just like neural system in a growing organism - continuously shifting and changing (multiple times a week sometimes)

I have multiple masters, because different systems "source" different event streams. Off-cloud systems have local event streams, which are then pushed over, while Cloud systems store event streams in the cloud storage.

Replicators run everywhere, where I can start them (in Azure/Rackspace and on dedicated DC). In fact, the backup that was saving my life today, was not a backup, but a replica from reporting system.

I don't aim for centralized topology, but rather a distributed network of dead-simple elements, that can replicate and heal as needed (and preferably survive nuclear attack on half of the world, while coming alive and sending autonomous domains back into the past to prevent this from happening)

Hope this helps

Best,
Rinat

Tom Janssens

unread,
Apr 5, 2012, 7:34:11 AM4/5/12
to ddd...@googlegroups.com
Thanks! 

Op donderdag 5 april 2012 13:12:34 UTC+2 schreef Rinat Abdullin het volgende:
Tom,

It's too early to write a post, because topology is currently not fixed. I'm adjusting it on daily basis, as more systems get proper ES support and/or are brought to CQRS world. In a sense, it is just like neural system in a growing organism - continuously shifting and changing (multiple times a week sometimes)

I have multiple masters, because different systems "source" different event streams. Off-cloud systems have local event streams, which are then pushed over, while Cloud systems store event streams in the cloud storage.

Replicators run everywhere, where I can start them (in Azure/Rackspace and on dedicated DC). In fact, the backup that was saving my life today, was not a backup, but a replica from reporting system.

I don't aim for centralized topology, but rather a distributed network of dead-simple elements, that can replicate and heal as needed (and preferably survive nuclear attack on half of the world, while coming alive and sending autonomous domains back into the past to prevent this from happening)

Hope this helps

Best,
Rinat

Greg Young

unread,
Apr 6, 2012, 3:08:39 AM4/6/12
to ddd...@googlegroups.com
Crap rinat is building skynet

Sent from my iPad

Tom Janssens

unread,
Apr 6, 2012, 1:33:06 PM4/6/12
to ddd...@googlegroups.com
I've asked John Titor, and he told me Cyberdyne Systems was actually founded by a Russian with the initials R.A. who used to work for a French company, cashed big-time and then decided to use his knowledge to serve his country...

Op vrijdag 6 april 2012 09:08:39 UTC+2 schreef Greg Young het volgende:
Crap rinat is building skynet

Sent from my iPad

Tom Janssens

unread,
Apr 6, 2012, 1:35:03 PM4/6/12
to ddd...@googlegroups.com
I also heard the Russian first tried to start a business trying to sell flash cards for learning Mandarin Chinese, but failed bigtime....

Op vrijdag 6 april 2012 19:33:06 UTC+2 schreef Tom Janssens het volgende:

Rinat Abdullin

unread,
Apr 7, 2012, 2:15:59 AM4/7/12
to ddd...@googlegroups.com
Yeah. And I heard of another much more crazy guy who went for starting dozens of businesses per year just for the heck of it. Then he suddenly discovered a "gold-mine" with one of these and went for an early retirement with Mrs to an island they bought from Greece.

Tom Janssens

unread,
Apr 7, 2012, 10:54:06 AM4/7/12
to ddd...@googlegroups.com
LOL !

Op zaterdag 7 april 2012 08:15:59 UTC+2 schreef Rinat Abdullin het volgende:
Reply all
Reply to author
Forward
0 new messages