Question on consuming queues

489 views
Skip to first unread message

Alexandru Nedelcu

unread,
May 16, 2016, 8:17:22 AM5/16/16
to mechanica...@googlegroups.com

Hi all,

I found the alternative queues defined at JCTools for various producer-consumer scenarios, seem awesome, except that I have an implementation problem.

  1. I have producer(s) that need to push events on a queue
  2. I have a single consumer dequeueing from it with a consumer-specific loop

My problem is that I need to start that consumer loop. And preferably to stop it whenever there are no more elements. But unfortunately this means the producers end up contending with the consumer on some sort of AtomicInteger keeping track of the queue’s current size, by doing something like this:

// on the producer side
if (eventsToProcess.getAndIncrement() == 0)
  threadPool.submit(startConsumerRunLoop())

// ... later, in the consumer loop ...
next = queue.poll()

if (next == null) {
  eventsToProcess.decrement(processedCount)
  continue = false
}
else {
  processedCount += 1
  // ...
}

Otherwise I can see no way around this, except for keeping a single loop active and continuously polling once every X milliseconds. But then polling seems to be wasteful, when having many such producer-consumer pipe lines in the same process, as that means having multiple loops actively polling on producers that might not be that active. And if I would set the polling to a very relaxed period, like say, once every 200 milliseconds, then that’s serious latency when events do happen added for no reason. And I’d introduce a mixture based on wait and notify but I can’t see how I can build something cheaper than doing synchronization by means of an AtomicInteger as described above.

So my actual question: besides keeping track of the queue size with an AtomicInteger and starting a run-loop when that’s zero and besides dumb polling when the queue is empty, do I have any other choices?

I’m asking this, because on one hand I’m excited about using something better than j.u.c.ConcurrentLinkedQueue, but on the other hand the advantages of a lock-less or even wait-free queue implementation seem in principle to not be a great win if I have to contend on an AtomicInteger just for starting that consumption loop.


Alexandru Nedelcu
alexn.org

Francesco Nigro

unread,
May 16, 2016, 8:34:36 AM5/16/16
to mechanical-sympathy
Hi,

if you are not bounded to the Queue interface's methods you could try the "drains" methods exposed by:

https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MessagePassingQueue.java

it enables you to build custom wait/exit strategies that allow dependently on your use cases of the different queues to waste less cpu time than what is achievable calling continuosly queue.size() (that is not consumer-specific by callable by any thread AFAIK).
I love PAUSE will be available in Java some day http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning

Vitaly Davidovich

unread,
May 16, 2016, 8:35:05 AM5/16/16
to mechanical-sympathy
Is this start/stop cycle continuous or a one time event? Your use case sounds like blocking mode is more appropriate.  Are you trying to minimize latency during the spikes when you're publishing lots of messages? Please elaborate a bit more on what you're trying to optimize for.  There are ways to implement responsive backoff on the consumer if you're keen on polling, but I'm unclear on why you prefer polling given what you've described thus far.

--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Richard Warburton

unread,
May 16, 2016, 9:12:19 AM5/16/16
to mechanica...@googlegroups.com
Hi,

So what a lot of systems do that are built around queue based message passing do is to have some kind of idle strategy. If you poll your queue and the queue is empty you idle, once you poll an event off the end of your queue then you reset the idle strategy.

The idle strategy can then choose to sleep with exponential backoffs or whatever. This stops you burning a core wastefully polling an empty queue. As you have an increasing number of empty polls you sleep for exponentially longer.

You still have the choice to burn a core of course poll or potentially use the pause operation if it makes a future Java version. This is an appropriate strategy if you aren't resource starved and want low latency event handoffs.

For a concrete code example see the IdleStrategy interface in Agrona and its use in Aeron.

regards,

  Richard Warburton

Francesco Nigro

unread,
May 16, 2016, 9:37:41 AM5/16/16
to mechanical-sympathy
Right. Anyway size is implemented with monitor purpose in mind and a consumer specific (considering it is single) method is preferrable...

Benjamin Manes

unread,
May 16, 2016, 1:03:08 PM5/16/16
to mechanical-sympathy
Another scheduling approach is to use activity to trigger a drain. This is useful when promptness only matters for the internal structures and not for a external agent. You can model the scheduling using a simple state machine.

The simplest form of this is a Semaphore(1) with a tryAcquire(1) after a producer adds an element. When acquired the drain task is submitted to a thread pool and the lock transferred. You can build a more complex state machine (such as: Idle, Required, Processing to Idle, Processing to Required) and still use a tryLock approach as an extra guard. If the queues fill up you have to decide whether to be lossy or use an idling strategy to wait until the consumer catches up. With this strategy you won't see contention due to CAS storms and can optionally amortize the consumer on the producers.

ymo

unread,
May 16, 2016, 7:15:25 PM5/16/16
to mechanical-sympathy
I think this is the point for being wait/lock free no ??? Or more likely you should not use it if it does not help )))

When you have a data structure that is wait free you are free to :
1) go do other (janitorial) things (when the resource is busy) and not spin cycles waiting for that resource to be released
2) go implement a wait loop spinning to wait for that resource to be free again if latency is important to you
3) go to sleep and lose on the latency side (when you need to come out of sleep) but you might gain on the throughput side of things
4) implement a more "involved" and "mixed" back off strategy when your resource is busy. For example mixing yields or spins or even going to sleep!

The point here is that it is up to you. The data structure is not deciding the wait/backoff stategy upfront for you ! At the beginning of time we had kernel arbitration which was the *only* way of sharing resources. And what the kernel did was just put things in a "queue" and decide who gets to use the shared resources. But all the threads were forced to "ask" the kernel if the resource was free with added risk of being blocked. At the very least you could ask the kernel to just put you to sleep and wake you up when something was ready. Of course this is still available today. With the newer lock free queues we are able to mix and match (based on latency or throughput requirements) the solution *when* and *where* it makes sense. 

p.s.
There is no contention for anAtomicIntegerin all lock free queues per say. That is specific to your implementation and not not necessarily true (luckily) for all lock free queues out there ! How you wait is left as an added benefit/exercise to you !

Regards

Francesco Nigro

unread,
May 17, 2016, 5:56:13 AM5/17/16
to mechanical-sympathy
An improved queue interface (MessagePassingQueue it's a good step in the right direction IMHO) could help to use better the different flavours of queues (lock-free/wai-freet/blocking). 
Being only a queue it's not enough :P

ymo

unread,
May 17, 2016, 9:22:20 AM5/17/16
to mechanical-sympathy
The closest i have seen is disruptor where you can pass your own wait/backoff/blocking strategy to the queue. But not all queues are created the same .. for sure )))

Francesco Nigro

unread,
May 17, 2016, 9:31:37 AM5/17/16
to mechanical-sympathy
Yep, the disruptor has a very informative and pleasant API, but as others on this group have ponted, the disruptor is a "different beast"..anyway would love other Queues follow the same phylosophy (like JCtools).
But if you want to be pretty you have to pay something :D : adding so many abstraction layers (dsl + ringBuffer + sequencer...) would benefits users and maintainence but not performances IMHO...

Sergey Zubarev

unread,
May 24, 2016, 11:25:45 AM5/24/16
to mechanica...@googlegroups.com
Hi, Alexandru!

If it still matters...
There is a variant of Multiple-Producers-Single-Consumer queue with empty-state,
so you do not need to have an additional AtomicInteger to monitor a queue size.

put() method gives a 'true' if queue was empty when item inserted.
The consumer side a bit tricky, but not so complex still.
So the usage pattern can look like:

// producer side
if (queue.put())
    threadPool.submit(startConsumerRunLoop())

// consumer loop
item = queue.get();
do {
process_item(item);
item = queue.get_next();
}
while (item != null);

The source code for a variant with intrusive items, wait-free producers and lock-free consumers

Sergey.

Овчинников Валерий Алексеевич

unread,
Jun 9, 2016, 8:58:52 AM6/9/16
to mechanical-sympathy
Hi Alexandru,

If I get you right you need wait/notify mechanism but without locking.
Then I guess you need LockSupport#park method family.
I don't know if there exist queues implemented in that way, but I almost sure they do.


понедельник, 16 мая 2016 г., 15:17:22 UTC+3 пользователь Alexandru Nedelcu написал:
Reply all
Reply to author
Forward
0 new messages