Batching/acking in Trident

1,174 views
Skip to first unread message

Sean

unread,
Aug 9, 2012, 12:18:39 PM8/9/12
to storm...@googlegroups.com
Is there any way to get Storm-like processing of individual tuples in Trident? Although I can set a batch size of 1 in Trident (at least for a BaseRichSpout), that performance is horrible. My issue is I have a multireduce that gets tuples from two streams (split from one spout), and joins tuples from stream B with the last seen tuple from stream A (many more come thru B than thru A). If they are batched, the output is different than not-batched (this makes sense).

How do batches behave when a stream is split? Does the same batch recombine if the streams are joined later? Are tuples within a batch always processed in order, and batches as well?

And if a node goes down and a batch fails, how does the system ensure that subsequent batches are not transferred (by bolts) to the new bolts until the first failed batch is replayed? As I understand, the transactional semantics rely on the fact that batches are ordered.

Also, is there any way to turn off acking in Trident? Not tagging tuples with message IDs and setting ackers to 0 don't seem to work (the latter causes a stack overflow).

Thanks in advance!
Sean

Nathan Marz

unread,
Aug 10, 2012, 2:19:06 AM8/10/12
to storm...@googlegroups.com
On Thu, Aug 9, 2012 at 9:18 AM, Sean <sts...@gmail.com> wrote:
Is there any way to get Storm-like processing of individual tuples in Trident? Although I can set a batch size of 1 in Trident (at least for a BaseRichSpout), that performance is horrible. My issue is I have a multireduce that gets tuples from two streams (split from one spout), and joins tuples from stream B with the last seen tuple from stream A (many more come thru B than thru A). If they are batched, the output is different than not-batched (this makes sense).

Yea, you don't want to use batches of size 1 in Trident. Currently Trident is useful for stream processing where latency on the order of hundreds of milliseconds is acceptable. However, in the future, I would like to add more logic for Trident topologies that don't use batching abstractions (e.g., all "each"'s) to get closer to the latencies you can get with standard Storm 1-at-a-time processing.

 

How do batches behave when a stream is split? Does the same batch recombine if the streams are joined later? Are tuples within a batch always processed in order, and batches as well?

Splitting a stream has no effect on the batch. If you join the stream back together, then yes, it will be the same batch.

Tuples are passed between partitions in the order they're emitted (repartitioning happens on groupBy, partitioning operations, and global aggregations). These are the same semantics you get from Storm.

 

And if a node goes down and a batch fails, how does the system ensure that subsequent batches are not transferred (by bolts) to the new bolts until the first failed batch is replayed? As I understand, the transactional semantics rely on the fact that batches are ordered.

State updates are ordered among batches. Each batch has both a "txid" and an "attempt id". The attempt id is a random long. This ensures that Storm can distinguish between multiple attempts for the same batch. Batches are controlled by a single coordinator thread (which is a regular Storm spout) that determines when batches get processed and when they get committed (commits are when state updates happen). The coordinator also ensures the ordering.

The coordinator abstraction is actually quite elegant. It builds upon the primitives that the tuple tree/acking framework provides to implement a relatively sophisticated distributed coordination algorithm.
 

Also, is there any way to turn off acking in Trident? Not tagging tuples with message IDs and setting ackers to 0 don't seem to work (the latter causes a stack overflow).

No, you can't. Acking isn't really expensive in Trident as long as your batches are of non-trivial size.
 

Thanks in advance!
Sean



--
Twitter: @nathanmarz
http://nathanmarz.com

Reply all
Reply to author
Forward
0 new messages