final EventPoller<DataEvent> poller = disruptor.getRingBuffer().newPoller();
disruptor.getRingBuffer().addGatingSequences(poller.getSequence());
// would like to chain the poller here
disruptor.handleEventsWith(aHandler).then(bHandler, cHandler).then(collator);
Thread pollerThread = new Thread() {
@Override
public void run() {
try {
System.out.println("--------------------------> thread started");
while (!isInterrupted()) {
DataEvent eventCopy = (DataEvent)getNextValue(poller);
System.out.println("------ eventCopy; " + eventCopy.counter);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
pollerThread.start();
private static Object getNextValue(EventPoller<DataEvent> poller) throws Exception { final Object[] out = new Object[1];
poller.poll( new EventPoller.Handler<DataEvent>() { @Override public boolean onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception { out[0] = event.copyOfData(); // Return false so that only one event is processed at a time.
return false; } }); return out[0]; }...
public static class CollatingPollingHandler implements EventPoller.Handler<DataEvent> { @Override public boolean onEvent(DataEvent event, long sequence, boolean endOfBatch) throws Exception {
--
You received this message because you are subscribed to the Google Groups "Disruptor" group.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disrupto...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disruptor+unsubscribe@googlegroups.com.
If a create a ringbuffer of 8, and I send 10 messages, the 9th wait that all the other 8s conclude before getting a sequence. I would have expected that as soon as a process ends, a resource would have been released. It hangs on ringBuffer.get(sequence). I think I do not release the event after processing as required. The code of the consumer can be found here:
If a create a ringbuffer of 8, and I send 10 messages, the 9th wait that all the other 8s conclude before getting a sequence. I would have expected that as soon as a process ends, a resource would have been released. It hangs on ringBuffer.get(sequence). I think I do not release the event after processing as required. The code of the consumer can be found here:
I also have an application where the disruptor event handlers are getting a lot of CPU time. Can it be related to the SleepingWaitStrategy?
Il giorno 08 dic 2016, alle ore 23:39, Michael Barker <mik...@gmail.com> ha scritto:This is because the Disruptor will release the resources at the end of the batch. This avoid a write (and likely cache miss) on the sequence until the batch completes. If you need to release resources early, you can use the SequenceReportingEventHandler interface instead. This will pass the Sequence into your handler so that you can update it as frequently as you like.
I also have an application where the disruptor event handlers are getting a lot of CPU time. Can it be related to the SleepingWaitStrategy?Possibly, the amount of CPU used by the SleepingWaitStrategy is dependent on OS and hardware. Using a profiler should be able to provide some insight.
To unsubscribe from this group and stop receiving emails from it, send an email to lmax-disruptor+unsubscribe@googlegroups.com.