Limit messages in MemoryChannel

238 views
Skip to first unread message

Abraham Menacherry

unread,
Mar 25, 2011, 3:21:10 AM3/25/11
to jetlang-dev
Is there a well-defined way in Jetlang to limit the number of messages
that can be sent on a particular channel?

I have an actor which is subscribed to a MemoryChannel. There is a
possibility that it can receive more requests/sec than it can process
so eventually this could lead to an OutOfMemory exception.

Thanks and Regards,
Abraham Menacherry.

peter royal

unread,
Mar 25, 2011, 9:56:19 PM3/25/11
to jetla...@googlegroups.com
On Mar 25, 2011, at 3:21 AM, Abraham Menacherry wrote:

> Is there a well-defined way in Jetlang to limit the number of messages
> that can be sent on a particular channel?
>
> I have an actor which is subscribed to a MemoryChannel. There is a
> possibility that it can receive more requests/sec than it can process
> so eventually this could lead to an OutOfMemory exception.

Create a Subscribable that implements your policy for what to do when your limit is reached.

The default ChannelSubscription is pretty dumb in that it just delivers everything to the subscriber on its fiber.

-pete

--
(peter.royal|osi)@pobox.com - http://fotap.org/~osi

Abraham Menacherry

unread,
Mar 26, 2011, 9:31:02 AM3/26/11
to jetlang-dev
Hi Pete,

Thanks for the quick response.

I am still in a little bit of doubt here. I see a size() message on
the subscribable which returns the number of subscribers not the
number of queued messages which I want to throttle

I tried it another way is this correct or am I way off tangent?
Note: The only method I modified is the publish and I am using an
ArrayBlockingQueue implementation for the LimitingMemoryChannel
implementation

<pre>
public class MessageLimitingChannel<T> implements Channel<T>
{
private final SubscriberList<T> subscribers;
private final ArrayBlockingQueue<T> limitQueue;
private final int LIMIT;

public MessageLimitingChannel(SubscriberList<T> subscribers,int
limit)
{
this.subscribers = subscribers;
this.LIMIT = limit;
this.limitQueue = new ArrayBlockingQueue<T>(limit, true);
}

public int subscriberCount() {
return subscribers.size();
}

public void publish(T s) {
try
{
limitQueue.put(s);
T msg = limitQueue.poll();
subscribers.publish(msg);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}

public Disposable subscribe(DisposingExecutor queue, Callback<T>
onReceive) {
ChannelSubscription<T> subber = new
ChannelSubscription<T>(queue, onReceive);
return subscribe(subber);
}

public Disposable subscribe(Subscribable<T> sub) {
return subscribeOnProducerThread(sub.getQueue(), sub);
}

public Disposable subscribeOnProducerThread(final
DisposingExecutor queue, final Callback<T> callbackOnQueue) {
Disposable unSub = new Disposable() {
public void dispose() {
remove(callbackOnQueue);
queue.remove(this);
}
};
queue.add(unSub);
//finally add subscription to start receiving events.
subscribers.add(callbackOnQueue);
return unSub;
}

private void remove(Callback<T> callbackOnQueue) {
subscribers.remove(callbackOnQueue);
}

public void clearSubscribers() {
subscribers.clear();
}

public int getLIMIT()
{
return LIMIT;
}
}
</pre>

-Abraham

Mike Rettig

unread,
Mar 26, 2011, 12:19:00 PM3/26/11
to jetla...@googlegroups.com
Abraham,

There are a couple solutions depending on your desired behavior.

1. Batching - Most of the time when I've run into this problem I've
solved it by using a batch subscriber or a keyed batch. This allows
the consumer to keep up with the producer. I typically don't want to
slow down the producer for any reason.

2. Producer Throttle - If you must slow down the producer, the
simplest solution is to limit by message rate. This will cause the
producer to block. In many cases this isn't optimal b/c the producer
doesn't know the capacity of the consumer. This solution is simple but
it doesn't give optimal throughput. The code you submitted won't work
for throttling since the same thread puts and polls from the queue.
Another thread is needed to pull from the queue. A better solution
would calculate the messages per second then wait if the rate is
exceeded. I would implement this as a simple Publisher decorator.

http://code.google.com/p/jetlang/source/browse/trunk/src/main/java/org/jetlang/channels/Publisher.java

3. Fiber queue cap - For optimal throughput, the best solution is to
cap the internal fiber queue. With this solution, the producer is
blocked when the consumers queue reaches capacity. The producer will
immediately be signaled when space frees up. This requires a few
changes to the internals of jetlang.

Will any of the solutions work for you?

Mike

> --
> You received this message because you are subscribed to the Google Groups "jetlang-dev" group.
> To post to this group, send email to jetla...@googlegroups.com.
> To unsubscribe from this group, send email to jetlang-dev...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.
>
>

Abraham Menacherry

unread,
Mar 27, 2011, 9:52:36 AM3/27/11
to jetlang-dev
Hi Mike,

Thanks for the quick reply.

I think all 3 might help me, but as you said the 3rd maybe the best
solution. Since throttling is a fairly common scenario, it would be
great to have a "ThrottledFiber" implementation in Jetlang itself. I
for one am quite ready to wait till the next release. I am sure it
would also help others who use this api extensively.

For point 2, usually I would use a second thread to be the consumer
but my idea was why to create a new thread when "publish" itself is
asynchronous.
so the producer thread blocks on limitQueue.put(s); till space is
available. Allowing *other* consumer threads to process *already*
queued tasks.

The following producer code would then submit stuff to the fiber to be
processed asynchronously when space is available. Does it make sense
or not at all?
T msg = limitQueue.poll();
subscribers.publish(msg);


Thanks,
Abraham.

Abraham Menacherry

unread,
Mar 27, 2011, 10:05:02 AM3/27/11
to jetlang-dev
Pasting the test I ran.

Even with the limit applied it ran into out of memory error. But that
may be because of the incorrect implementation that you pointed out in
#2 above.
Note: This is one seriously fast library on my laptop it ran 5 Million
in ~6 seconds. Excellent api you have made!!

@Test
public void testThrottledMemoryChannel() throws InterruptedException
{
final String jetlang = "Hello World Jetlang";
final int count = 50000000;
final CountDownLatch latch = new CountDownLatch(count);
final AtomicInteger counter = new AtomicInteger(0);
MyAgent magent = new MyAgent(new
MessageLimitingChannel<Runnable>(new SubscriberList<Runnable>(),
100));
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++)
{
magent.send( new Runnable()
{
@Override
public void run()
{
String[] split = jetlang.split(" ");
for (String str : split)
{

}
latch.countDown();
counter.incrementAndGet();
}
});
}
long end = System.currentTimeMillis();
Assert.assertEquals(true, latch.await(100, TimeUnit.SECONDS));
System.out.println("Took " + (end - start) + " ms");
System.out.println("Counter: " + counter);
}

public class MyAgent
{
ExecutorService service = Executors.newFixedThreadPool(25);
PoolFiberFactory fact = new PoolFiberFactory(service);
final Channel<Runnable> channel;
final Fiber fiber;
final Callback<Runnable> callback = new Callback<Runnable>()
{
@Override
public void onMessage(Runnable message)
{
message.run();
}
};

public MyAgent(Channel<Runnable> channel)
{
this.channel = channel;
this.fiber = fact.create();
fiber.start();
channel.subscribe(fiber,callback);
}

public void send(Runnable code)
{
channel.publish(code);
}
}

Thanks,
Abraham.

Mike Rettig

unread,
Mar 27, 2011, 8:46:24 PM3/27/11
to jetla...@googlegroups.com
I added a capped queue.

http://code.google.com/p/jetlang/source/detail?r=523

One caveat is that the cap can cause a deadlock if the consuming
thread posts events back to its own full queue. This is a rare case
and can be fixed by using another fiber to post back events.

Let me know if this solves your problem.

Mike

Abraham Menacherry

unread,
Mar 29, 2011, 2:22:10 AM3/29/11
to jetlang-dev
Mike,

Thanks for getting this out so fast!!

I tested it out and this works perfect for me.

Can this implementation be used with FiberFactory by any chance? I
looked it up and couldnt find a way to plug it in.

Thanks,
Abraham.

Mike Rettig

unread,
Apr 3, 2011, 8:03:10 PM4/3/11
to jetla...@googlegroups.com
FiberFactory will take some more work. How many threads are you using?
For most apps, using dedicated threads is a better approach.

Mike

Abraham Menacherry

unread,
Apr 5, 2011, 8:46:04 AM4/5/11
to jetlang-dev
Each session in my app uses a fiber and I would like to prevent any
one session (human/computer) from sending too much data than the
server can process.

If I use thread fiber per session, it would not scale to thousands of
sessions(i think). This is why I use the PoolFiberFactory to create
fiber's for session and the reason behind my request.

Thanks,
Abraham.

Mike Rettig

unread,
Apr 5, 2011, 11:07:24 PM4/5/11
to jetla...@googlegroups.com
Yes, the pool fiber is a better solution for thousands of sessions.

I've started a new sub-project of jetlang that adds remoting to jetlang.

http://code.google.com/p/jetlang/source/browse/#svn%2Fserver

It might be of interest if you need a client server setup.

I don't know when I'll have a chance to add the queue capping to the
pool fiber since I'm actively working on the remoting implementation.

Mike

Abraham Menacherry

unread,
Apr 7, 2011, 2:30:48 AM4/7/11
to jetlang-dev
I am perfectly willing to wait for it. Even though I would eventually
want to scale to thousands of sessions, it is not an immediate
requirement. Kindly add this feature request to your list.

I took a fleeting look at the remoting code, what you are doing would
be greatly beneficial to my scenario, with some integration issues
though. I already use NIO framework Netty(which is awesome fast b.t.w)
for my network and *may* use Kryo(http://code.google.com/p/kryo/) for
object serialization.

But I guess at some point I could plug-in Netty handlers for the
Jetlang session at the network side and re-use the rest of the Jetlang
goodness.

Thanks again for coming out with this gem of a library!


Abraham Menacherry

unread,
Apr 29, 2012, 8:09:48 AM4/29/12
to jetla...@googlegroups.com
Mike,

This is an old feature request. But, I have published a game server(https://github.com/menacher/java-game-server) which uses jetlang to github and would like to have message limiting ability of pooled fibers as well. Is this possible?

If you check out the event handling performance test written for jetlang event dispatcher at https://github.com/menacher/java-game-server/blob/master/jetserver/src/test/java/org/menacheri/jetserver/app/impl/PlayerSessionTest.java you will notice that it throws OutOfMemory exception for larger numbers, while what I would like in such kind of scenario is a good "back pressure"

Thanks,
Abraham.

peter royal

unread,
May 3, 2012, 7:26:55 AM5/3/12
to jetla...@googlegroups.com
Abraham -

How do you envision the API for a feature like this working? (Keeping in mind that Publisher.publish() needs to support 0..n receivers under the covers)

-pete

--
You received this message because you are subscribed to the Google Groups "jetlang-dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/jetlang-dev/-/D6AQj4-WQdMJ.

To post to this group, send email to jetla...@googlegroups.com.
To unsubscribe from this group, send email to jetlang-dev...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.

Abraham Menacherry

unread,
May 7, 2012, 7:06:37 AM5/7/12
to jetla...@googlegroups.com
Pete,

I am not sure if I understood you correctly. Were you asking me how it can be implemented? I am definitely not sure!! a summation of all consumers capped queue sizes >= accepted limit? A separate entry queue before events get allocated to individual consumer queue?. I don't know whether its feasible or not. But it would definitely be a nice feature to have, at least in my scenario.
To unsubscribe from this group, send email to jetlang-dev+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.

peter royal

unread,
May 7, 2012, 7:33:40 AM5/7/12
to jetla...@googlegroups.com
Not how it could be implemented exactly, more a thought exercise of if it was implemented, what would the changes to the Publisher, Subscriber, etc interfaces look like (or what new interfaces would be necessary). 

To the extend that API design drives implementation (and vice-versa), since you have thoughts on how it would be useful in your project, it would be helpful to understand how it might look

-pete


To view this discussion on the web visit https://groups.google.com/d/msg/jetlang-dev/-/uCiYEZ6VqaoJ.

To post to this group, send email to jetla...@googlegroups.com.
To unsubscribe from this group, send email to jetlang-dev...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.

Abraham Menacherry

unread,
May 7, 2012, 11:19:51 AM5/7/12
to jetla...@googlegroups.com
Pete,

Well, the best one would be "no api change". The producer would just block on publish, till space is available. Not sure how it can be done though.
If change in API is inevitable, one of the things I can think of is the disruptor pattern, where producers need to acquire a slot before publishing and this in turn causes the back pressure. Consumers can play catch up while the producer is waiting.

peter royal

unread,
May 12, 2012, 11:00:11 AM5/12/12
to jetla...@googlegroups.com
the transparently blocking publisher implementation seems problematic to me as a default for a MemoryChannel since it could be feeding multiple subscribers. 

but in the case where you know there is only one subscriber, maybe an alternate channel implementation that did this might work? would that satisfy your use case?

-pete

-- 
peter royal - (on the go)
--
You received this message because you are subscribed to the Google Groups "jetlang-dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/jetlang-dev/-/1PTAjj8mf3sJ.

Abraham Menacherry

unread,
May 14, 2012, 7:15:42 AM5/14/12
to jetla...@googlegroups.com
Yes. An alternate channel implementation is perfectly feasible.I use the MemoryChannel for event dispatching in a java game server project and this is abstracted from client. To see my current EventDispatcherImpl just follow this link -> https://github.com/menacher/java-game-server/blob/master/jetserver/src/main/java/org/menacheri/jetserver/event/impl/JetlangEventDispatcher.java
I can just replace the current MemoryChannel impl with the new one.
To unsubscribe from this group, send email to jetlang-dev+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages