What is the impact of TridentState on Scalability?

200 views
Skip to first unread message

Brian O'Neill

unread,
May 16, 2013, 4:50:43 PM5/16/13
to storm...@googlegroups.com

Can someone check me here?

It is my understanding that Trident batches up the tuples.  Then, batches are sequenced.  

From the wiki:
"State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded."

Does this mean that only one TridentState instance is active in the topology at a time? 
(e.g. Effectively, wouldn't that mean that calls to commit() are single threaded)

Won't that mean that throughput of the topology is limited to the write throughput of a single host/instance?
(I can write the batch contents in parallel within a single node, but that seems to defeat horizontal scalability)

What am I missing?

-brian

---

Brian O'Neill

Lead Architect, Software Development

Health Market Science

The Science of Better Results

2700 Horizon Drive  King of Prussia, PA  19406

M: 215.588.6024 @boneill42    

healthmarketscience.com


This information transmitted in this email message is for the intended recipient only and may contain confidential and/or privileged material. If you received this email in error and are not the intended recipient, or the person responsible to deliver it to the intended recipient, please contact the sender at the email above and delete this email and any attachments and destroy any copies thereof. Any review, retransmission, dissemination, copying or other use of, or taking any action in reliance upon, this information by persons or entities other than the intended recipient is strictly prohibited.

 

Nathan Marz

unread,
May 16, 2013, 5:38:11 PM5/16/13
to storm-user
A TridentState is parallelized. Batches are processed in parallel, and each partition is committed to in parallel.


--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Twitter: @nathanmarz
http://nathanmarz.com

Brian O'Neill

unread,
May 16, 2013, 6:01:32 PM5/16/13
to storm...@googlegroups.com
Thanks for the quick reply Nathan.

And I apologize, I should have been more specific.

It appears that the trick that *Transactional* Trident State use to guarantee exactly-once semantics relies on sequencing the commits.  Is that the case?

If so, then is it true that any Trident topology that leverages Transactional state, potentially sacrifices horizontal scalability?

Your response, triggered another question...

If I'm writing a new transaction TridentState implementation, and commit() is called with a future identifier (e.g. 20, where the last written to storage is 10), should the implementation block until the appropriate identifier is written? (e.g. 19)

Or am I still off track?

-brian
-- 
Brian ONeill
Lead Architect, Health Market Science (http://healthmarketscience.com)
mobile:215.588.6024
blog: http://weblogs.java.net/blog/boneill42/
blog: http://brianoneill.blogspot.com/

Nathan Marz

unread,
May 16, 2013, 6:20:35 PM5/16/13
to storm-user
No, there's no scalability being sacrificed.The commits are sequenced on a per-partition basis – and since you're processing batches at a time (as opposed to one tuple at a time), it scales just fine.

No, when commit is called then that's the current txid regardless of what you had written before.

Brian O'Neill

unread,
May 16, 2013, 7:05:18 PM5/16/13
to storm...@googlegroups.com
Perfect.  I think that is what I'm missing... the relationship between partitions and batch identifiers.

So, does a transactional Trident State implementation need to take partitioning into account in state storage?

Are batch transaction identifiers (txIds) unique across the topology or a partition?

Here is an example:

If I am implementing State and StateUpdater directly with a simple topology like this:
inputStream.each(new Fields("message"), new MessageTypeFilter())
                .partitionPersist(new DruidStateFactory(), new Fields("message"), new DruidStateUpdater());

I assume Storm will use the Factory to create multiple instances of my DruidState object and multiple instances of my DruidStateUpdater across different hosts.  From what I'm seeing, I have 5-10 batches in progress at any one time.  And from your previous response, I assume both updateState() and commit() are called in parallel on all of those distributed DruidState objects.  

In parallel, are the State.commit() calls all receiving the same txId or different txIds at any point in time?

-brian

Nathan Marz

unread,
May 16, 2013, 8:50:18 PM5/16/13
to storm-user
txids are monotonically increasing for the entire topology. What a commit happens, each partition will receive the exact same txid. And yes, a state implementation should probably take into account the fact that it's partitioned. Though a lot of that logic would probably be in the topology itself: e.g., with a MapState making sure only one partition is updating a particular key at once (by doing a groupBy or whatever). 

Nathan Marz

unread,
May 16, 2013, 8:50:56 PM5/16/13
to storm-user
And Trident doesn't move onto the next txid until the entire previous transaction completed successfuly. There's only ever one transaction committing at one time.

Brian O'Neill

unread,
May 16, 2013, 8:59:25 PM5/16/13
to storm...@googlegroups.com

Thanks Nathan.  That clears everything up.  

So, the State implementations need to be careful not to step on one another.
(potentially using a "bring your own partitioning" BYOP strategy built into the topology itself)

If you couldn't tell from the example, I've built a State implementation that connects Storm to Druid.  I've got an example for financial analytics that consumes FIX messages, and provides analytics via Druid over time.  Druid + Storm seems to be a powerful combo.

-brian

SKG

unread,
May 17, 2013, 1:00:00 AM5/17/13
to storm...@googlegroups.com, bo...@alumni.brown.edu
I have a requirement of varying batch size for Transaction Trident State.

Can u give some info on how to handle this in State Implementation.

Simon Cooper

unread,
Jul 18, 2013, 11:17:37 AM7/18/13
to storm...@googlegroups.com, nat...@nathanmarz.com
So there's no possibility of dirty reads, with a topology that does a basic 'read from stored state -> do some stuff -> write to stored state'? No reads will take place until the previous batch has finished running?

What about a more complex topology that does 'read -> process -> write -> process -> write'? Does trident know the first write doesn't entirely commit the batch?

thanks,
SimonC
Reply all
Reply to author
Forward
0 new messages