How to delay processing of a tuple?

1,565 views
Skip to first unread message

Hussein B.

unread,
Mar 4, 2012, 10:01:21 PM3/4/12
to storm-user
Hi,
According to : http://nathanmarz.github.com/storm/doc/backtype/storm/task/IBolt.html#execute(backtype.storm.tuple.Tuple)
A bolt could delay processing a tuple, how this is done (at the code
level) ?
Checking for a condition to be met for example?
Thanks.

Nathan Marz

unread,
Mar 6, 2012, 9:11:33 PM3/6/12
to storm...@googlegroups.com
There's los of ways to do this. You can store the tuple in an instance variable and process it on a later execute call. Or you can store it in an instance variable and have another thread process it. It's completely up to you. The only thing Storm asks of you is that you ack every tuple the bolt receives.
--
Twitter: @nathanmarz
http://nathanmarz.com

Jim Alateras

unread,
Mar 8, 2012, 4:03:59 AM3/8/12
to storm...@googlegroups.com
Nathan,

Is there a mechanism to schedule when a tuple should be emittesd. For instance say I want to emit a tuple in 10 seconds or 10 minutes and still want to maintain guaranteed delivery semantic.. I guess I am after something with the same functionality as Java's ScheduledExecutorService (http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html)
Is this out of scope for storm?

Tom Jack

unread,
Mar 8, 2012, 10:23:46 PM3/8/12
to storm...@googlegroups.com
I need something like this as well. If you want to maintain the exact same delivery guarantee semantics, your spout timeouts will need to be larger than the maximum delay.

If you want long delays and short timeouts, you might just have the bolt ack the incoming tuple as soon as it has registered a delayed emit in some reliable store. Then, add another spout that emits these tuples from the store (after the delay). You might subscribe to this delay spout from the delay bolt. This gives different delivery guarantee semantics, but it works for some use cases.

My current plan (unimplemented so far) is to use special Kestrel queues with no consumers and expireToQueue set to queues consumed by spouts. The bolt can pick the expiration time when it enqueues the item into a delay queue. Since we only have at-least-once guarantees in the first tuple tree, we might enqueue the same item multiple times into the delay queue, so this could get tricky. But I think it should work.

Another option if you're on AWS could be SQS, using message timers (max 15min) and/or visibility timeouts (max 12hr). It should be straightforward to extend storm-contrib-sqs to use these once you decide on the exact semantics you want.

Nathan Marz

unread,
Mar 9, 2012, 4:40:18 AM3/9/12
to storm...@googlegroups.com
You don't need this feature "provided by Storm". You can spawn threads and emit tuples from there if you want. Just remember that when you delay processing a tuple from a bolt, you might need to increase the TOPOLOGY_MESSAGE_TIMEOUT.

With spouts, nextTuple is called in a loop. To do the timing behavior, the cleanest approach is to have nextTuple do nothing until enough time has passed, upon which you emit your tuple. This way, your spout implementation still respects TOPOLOGY_MAX_SPOUT_PENDING.

Ted Dunning

unread,
Mar 9, 2012, 1:55:40 PM3/9/12
to storm...@googlegroups.com
The real problem with this delayed tuple idea is that Storm does real(-ish)-time computing and what you are effectively asking for is anti-real-time.

I think that the idea about queuing into an external resource is a good one.  You can ack the tuple once it is confirmed to be in the messaging system.  Then after the delay, it will re-enter the Storm topology (or a different topology) and processing will continue.  This is much better than messing about with Storm's guarantees.

Jim Alateras

unread,
Mar 13, 2012, 4:50:27 AM3/13/12
to storm...@googlegroups.com
Makes sense to me.

Michael Rose

unread,
May 17, 2013, 7:01:17 PM5/17/13
to storm...@googlegroups.com
We tend to use AMQP dead-letter queues and SQS for delays.

The workflow looks like this:

Spout pops message

if time is within an epsilon of the desired processing time, emit the tuple
else requeue with delay.

With SQS, you can have a max of a 15 minute delay, so it's important to have logic that can requeue a message if its processing time hasn't passed yet. This is especially useful for rate-limiting scenarios without a bunch of spin-waiting. A 40 minute delay is 2 15 minute days followed by a 10 minute delay.

-- 
Michael Rose (@Xorlev)
Senior Platform Engineer, FullContact
mic...@fullcontact.com

On Friday, May 17, 2013 at 9:51 AM, David James wrote:

I'm rediscovering this thread later, with a similar question.

Re: "The real problem with this delayed tuple idea is that Storm does real(-ish)-time computing and what you are effectively asking for is anti-real-time."

Not true. Asking for delayed message processing is not against the spirit of Storm. Scheduling processing for some point in the future, to be executed at or near that time in the future, is a reasonable and common use of a real-time system. If you want an example, take a look at any alarm clock. (Batch systems may be good for assembling alarm clocks but I don't want my alarm clock itself to be a batch system.)

--
You received this message because you are subscribed to the Google Groups "storm-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to storm-user+...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Reply all
Reply to author
Forward
0 new messages