Clearing a ring buffer (resetting the cursor)

982 views
Skip to first unread message

Stephanie Galbo

unread,
Jan 22, 2015, 9:33:06 AM1/22/15
to lmax-di...@googlegroups.com
Hi, I'm new to the disruptor pattern but so far I really like it. It has greatly improved throughout and latency on my current project. 

I need to be able to clear the contents of the ring buffer (just like how you would clear a blocking queue), and I thought I found a way but it doesn't seem to work all the time. 

        disruptor.getRingBuffer().resetTo(-1);

Is this correct? I thought it might be this thinking the cursor was pointing to where the last event was written, but I'm getting inconsistent results. I test the buffer size using this:


       DISRUPTOR_SIZE - disruptor.getRingBuffer().remainingCapacity();

where DISRUPTOR_SIZE is the value I passed in to the disruptor when the object was created. Before the resetTo() is called, the calculated buffer size is usually a non-zero value, and after the resetTo it always goes to 0, making me think that the buffer is being cleared. 

Michael Barker

unread,
Jan 22, 2015, 3:01:31 PM1/22/15
to lmax-di...@googlegroups.com
Hi,

The resetTo() method is probably not the best thing to use.  From the Javadoc "...but it is worth noting that it can cause a data race and should only be used in controlled circumstances.  E.g. during initialisation".  It is a bit of a hack that I squeezed in for a slightly weird use case that we have where we need to start the disruptor at a specific sequence number after construction, but before any events flow through the system.  Calling it at any other time could cause an issue, hence the inconsistent results that you are seeing.  I'm going to deprecate that method as it shouldn't really be used.

The closest thing to doing a clear would be to halt all of the consumer threads, then update their sequences values to be the same as the ring buffer's cursor value, then restart them.  I'm going to put this in an enhancement and see if there is a safe and efficient way to do it.

I'd be interested in hearing your use case for clear() and see if there are any approaches that might fit your needs.

Mike.


--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Sam Barker

unread,
Jan 22, 2015, 3:22:01 PM1/22/15
to lmax-di...@googlegroups.com
To ensure we retain messages for as short a time as possible our event handler null's the reference to the message once it has been consumed from the ring buffer. That is we set reference from the ring buffer to the data to null. Which means that when ever the consumer catches up with the producer the ring buffer is empty.

If you have multiple consumers you could gate an additional EventHandler on the sequences of your consumers which nulled the reference once they had all dealt with the entry.

Sam

On 23 January 2015 at 03:33, Stephanie Galbo <mdngh...@gmail.com> wrote:

Stephanie Galbo

unread,
Jan 26, 2015, 3:27:21 PM1/26/15
to lmax-di...@googlegroups.com
Thanks for the replies.

I have multiple producers and a single consumer which just grabs data and sends out to a protocol endpoint. If the protocol connection goes down and/or the message fails to send for some other reason, I want to drop the message and all other queued messages completely because we have some more advanced resend application logic upstream. Previously we were able to just clear the blocking queue so that the resend component could handle the buffering and resend times. 

Michael Barker

unread,
Jan 26, 2015, 4:59:53 PM1/26/15
to lmax-di...@googlegroups.com
Hi Stephanie,

For the situation that you mention, there is a couple of other options that may work.  The solutions mentioned below will be much more efficient that the full implementation of clear that I mentioned in the previous email.

1) Just swallow the exception/error that you get from the protocol.  This works if the client that you are using to send with can throw an exception quickly if the connection is unavailable.  This is not always possible, e.g. using TCP/IP can result in long delays before a send will fail, especially if the remote endpoint goes away without the local machine seeing a fin or rst packet.

@Override
public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception
{
    try
    {
        send(event);
    }
    catch (ProtocolException e)
    {
        // handle exception
    }
}

Within handle exception, either log a message or record a counter*, but don't re-throw the exception.  This will allow the EventHandler to skip over events until the connection becomes available again.

* In high performance systems, which may dealing with exceptions that could occur at very frequent intervals, e.g. once per message I tend to avoid logging and instead use a counter that I can monitor external with something like JMX.  Most logging systems tend to be fairly inefficient and logging large number of exceptions can actually cause the system to fall into an even worse state.

2) The event handler could detect the error then set a use a sequence number to discard old events.  The Cursored interface is implemented by the RingBuffer class, so on construction you would need to inject and instance of the RingBuffer into the EventHandler.

E.g.

public class SkippingEventHandler implements EventHandler<ValueEvent>
{
    private final Cursored cursored;
    private long skipOverSequence = -1;
    
    public SkippingEventHandler(Cursored cursored)
    {
        this.cursored = cursored;
    }
    
    @Override
    public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        if (sequence <= skipOverSequence)
        {
            return;
        }
        
        try
        {
            send(event);
        }
        catch (ProtocolException e)
        {
            sequence = cursored.getCursor();
            // handle exception
        }
    }
}

3)  If you wanted to manage the skipping over of events from a separate thread (I'm sure which thread you are calling clear() from in your current implementation) then you could add a volatile field.  You would call RingBuffer.getCursor() and pass it into the skipEvent method.

public class SkippingEventHandler implements EventHandler<ValueEvent>
{
    private volatile long skipOverSequence = -1;

    @Override
    public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception
    {
        if (sequence <= skipOverSequence)
        {
            return;
        }

        try
        {
            send(event);
        }
        catch (ProtocolException e)
        {
            sequence = cursored.getCursor();
            // handle exception
        }
    }

    public void skipEvents(long sequenceToSkipOver)
    {
        this.skipOverSequence = sequenceToSkipOver;
    }
}

Mike.



Reply all
Reply to author
Forward
0 new messages