RingBufferProcessor - releasing objects held in the RingBuffer

165 views
Skip to first unread message

Gesly George

unread,
Mar 7, 2016, 7:19:29 PM3/7/16
to reactor-framework
Hi Stephane,
I'm  trying to understand how objects held in the ring buffer get released (garbage collected) after they are consumed. Are objects/values held by the MutableSignal in the RingBuffer available for garbage collection after they have been consumed? Or are they available for GC only after the MutableSignal.value is replaced by a new published value? From the code, it looks like the consumed values are available for GC only after a new published value replaces the existing value in MutableSignal. Is there a way to make sure consumed values are available for GC after they have been consumed. In almost all cases, I'm using a Stream wrapper around a RingBufferProcessor and attaching consumers to the stream.

Could you please confirm my finding?

Thanks,
Gesly

Gesly George

unread,
Mar 7, 2016, 7:27:41 PM3/7/16
to reactor-framework
Additionally, what happens to filtered objects/values?

Stephane Maldini

unread,
Mar 8, 2016, 5:31:29 AM3/8/16
to Gesly George, reactor-framework
It's a good point and something we might improve by RC1 actually (dispatchOn/publishOn to support plain ExecutorService as part of reactor-core and reactor-stream merge). 
I actually started experimenting with a naive shutdown process that marks the buffer as "completed" by wrapping ahead the producer index while enforcing one consumer index behind by the size of the buffer (which is the usual mech at play with ringBuffer.

In a position of SchedulerGroup (shared RingBufferProcessor accross multiple Stream/Flux via dispatchOn),  this is very less obvious and in continuous streams of data as well, but that can lead to other issues I reckon. 

Filtered values are just dropped downstream, then its up to the ringbuffer queue to receive more.

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Reactor Project Lead, Spring Engineering | London | Pivotal

Gesly George

unread,
Mar 8, 2016, 11:48:53 AM3/8/16
to reactor-framework, gesly....@gmail.com
Comments below.


On Tuesday, March 8, 2016 at 2:31:29 AM UTC-8, smaldini wrote:
It's a good point and something we might improve by RC1 actually (dispatchOn/publishOn to support plain ExecutorService as part of reactor-core and reactor-stream merge). 
I don't think I understand how supporting ExecutorService would help here.
 
I actually started experimenting with a naive shutdown process that marks the buffer as "completed" by wrapping ahead the producer index while enforcing one consumer index behind by the size of the buffer (which is the usual mech at play with ringBuffer.

In a position of SchedulerGroup (shared RingBufferProcessor accross multiple Stream/Flux via dispatchOn),  this is very less obvious and in continuous streams of data as well, but that can lead to other issues I reckon. 

Filtered values are just dropped downstream, then its up to the ringbuffer queue to receive more.

Yes, if its a 100% non-stop continuous stream, the issue is not obvious at all. But consider this scenario: lets say the stream does not get new data for a couple of seconds. If all the previously consumed objects were available for GC, then they might have been released while the stream was inactive. But here they would only get GCed while the stream is active again. In general, better to make the objects available to the GC as soon as you are done with them. Is there some way to null out MutableSignal.Value after the last consumer has consumed the value?

Stephane Maldini

unread,
Mar 8, 2016, 12:28:41 PM3/8/16
to Gesly George, reactor-framework

It's a good point and something we might improve by RC1 actually (dispatchOn/publishOn to support plain ExecutorService as part of reactor-core and reactor-stream merge). 
I don't think I understand how supporting ExecutorService would help here.

So the point is that dispatchOn/publishOn only have one candidate input parameter : the SchedulerGroup (Dispatcher) based on Event Loop Processor such as TopicProcessor ( RingBufferProcessor ) or WorkQueueProcessor. The event loop busy spin with minimum latency off the ring buffer consumed as a queue in effect but not dereferenced (nullified) as it might cause memory barrier and performance issues.

Supporting ExecutorService like ForkJoin pool directly or plain thread executorService we would partially on their queue which works as you expect, dereferencing the data as you consume it. Think flux.dispatchOn(someExecutorService) in addition to flux.dispatchOn(ringBufferProcessor/schedulerGroup).

Yes, if its a 100% non-stop continuous stream, the issue is not obvious at all. But consider this scenario: lets say the stream does not get new data for a couple of seconds. If all the previously consumed objects were available for GC, then they might have been released while the stream was inactive. But here they would only get GCed while the stream is active again. In general, better to make the objects available to the GC as soon as you are done with them. Is there some way to null out MutableSignal.Value after the last consumer has consumed the value?

No there is no current alternative but using Java introspection to access unsafely the RingBuffer property and dereference from here directly with get() MutableTask then data.null (or currently Slot.data = null).

Gesly George

unread,
Mar 8, 2016, 8:37:44 PM3/8/16
to reactor-framework, gesly....@gmail.com
Got it now, thanks. I look forward to the updated 2.5 documentation. If some examples for transitioning between 2.x to 2.5 that would be useful. Esp since there is some change in terminology.

Gesly George

unread,
Aug 8, 2016, 11:20:51 PM8/8/16
to reactor-framework, gesly....@gmail.com
Stephane,
I've been thinking about object creation and garbage a bit more, particularly concerned with the fact that there is object creation associated with publishing events to a RingBufferPublisher when using reactor. This is different from the more native use of the RingBuffer implementation where one of the goals is to reduce the amount of garbage created. This is particularly important in systems which processes a large number of messages and the per second message rate is high. In such systems, there is a lot garbage collector pressure when using reactor. 

What if there was a way to provide an EventTranslator (from Disruptor) so that values can be copied into the MutableSignal instead of passing object references. If this could be introduced in some way (it could even be byte[] for low-level use cases) then it could help reduce GC pressure significantly. 

Btw, in 3.0 is reactor-core no longer dependent on the original lmax Disruptor implementation?

Look forward to hearing your thoughts on this.

Thanks,
Gesly
Reply all
Reply to author
Forward
0 new messages