Limit messages in MemoryChannel (Non-blocking)

17 views
Skip to first unread message

David Allsopp

unread,
Oct 18, 2011, 4:31:58 AM10/18/11
to jetlang-dev
All,

Following on from the thread "Limit messages in MemoryChannel"...

I have a use case where I don't want the producer to ever block, but I
know that some consumers may not always be able to keep up (or may
even stop responding - they may be using 3rd party code so I don't
'trust' their performance).

I would like finite (capped) queues in front of the consumers so that
if one or more of them fails to keep up, or even stops entirely, its
queue won't grow indefinitely (no OOM), the others will keep going,
and the producer won't be blocked. I can tolerate dropped messages to
the slow consumer, but the fast consumers should still receive theirs.

However, I would like the producer to become aware that there is a
problem so that it can modify its output and/or inform the user.

My solution so far is to modify CappedBlockingQueue so that it throws
a RuntimeException when full, rather than blocking. However, this
doesn't work well if there is more than one subscriber with such a
queue - the MemoryChannel, via its SubscriberList, will try to publish
to each subscriber in turn, so any exception from a subscriber may
prevent other subscribers from receiving the message. In other words,
partial failure becomes total failure.

So the question, in a nutshell, is how to do non-blocking pub-sub,
that tolerates (and reports) partial failure, with capped queues on
unreliable consumers... I guess it would need a modified
SubscriberList.executeAll() that tries each subscriber in turn,
catching any exceptions, then finally throws a 'summary' exception if
there were any exceptions from individual subscribers.

Would be grateful for any thoughts/alternative approaches...

David.

Mike Rettig

unread,
Oct 29, 2011, 12:21:12 PM10/29/11
to jetla...@googlegroups.com
The CappedBlockingQueue could be modified to call an error handler in
the event of a dropped message. That might not be ideal b/c it
doesn't synchronously report back to the producer that there is a
problem. The publisher could check the error handler after each
publish. Not very elegant, but it would work.

Another option would be to introduce another Fiber that simply pipes
the messages to the potentially slow consumer. In the event of a
runtime exception, it could publish this back to the producer. The
producer wouldn't get synchronous feedback, but it would be informed
soon enough. Most slow consumer problems can be solved this way.

The channel could also be dropped in favor of directly pushing
Runnables into the Fiber queues. This would allow the publisher to
know if any consumer is not keeping up and still publish to all
consumers. This might be the best option. As you already observed,
you could modify the channel implementation or provide an entirely
different Channel interface that allows the publisher to be made aware
of publish failures.

I think I would go with a new channel interface that provides feedback
to the publisher.

Runnable onPublishFailure = ...action to take on publish failure.

MyChannel channel = .. custom channel

MyMsg msg = .. msg to publish

channel.publish(msg, onPublishFailure);

Hope that helps,

Mike

> --
> You received this message because you are subscribed to the Google Groups "jetlang-dev" group.
> To post to this group, send email to jetla...@googlegroups.com.
> To unsubscribe from this group, send email to jetlang-dev...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.
>
>

David Allsopp

unread,
Oct 31, 2011, 11:03:08 AM10/31/11
to jetlang-dev
Mike,

Thanks very much, very helpful - that gives me several options to
try!

Regards,

David.
Reply all
Reply to author
Forward
0 new messages