thanks, Rajiv, the link is right. it is my mistake to forget to attach the url. I am just recovered from a cold, and in a half-vacation state. but i will in partial time.
--
You received this message because you are subscribed to the Google Groups "landz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to landz+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
yes, the user is free to use any kind of combinations of existed good practices with minimum effort and full control of his choosen.
I've been going through the source and there is a lot of stuff to see.The style of the code seems a very C style dealing in primitives wherever possible instead of objects. I see you are handing out addresses (longs) from alloc instead of byte buffers etc. This seems great for perf, though I wonder how different it is from writing C/C++ straight up ;)Another thing that I noted was your ring buffer implementation is fulfilling more of a queue interface. It accepts new T objects instead of reusing new ones. How does that compare to the disruptor where you get an old slot and just change the values on an old object? The queue approach seems like it would create more garbage.
Hi, Rajiv, you are almost there:) thanks for feedback. In fact, this design is to return control of object to the user. That is, the user decides where the sent object is from. You can cache/pre-allocated your objects. If you want send primitive type value, you can use LongHyperLoop or IntHyperLoop. The slot (of internal buffer) is needed in both Disruptor and HyperLoop(RingBUffer). The Disruptor use a setVaule(Object) to populate the slots. In fact, they are the both basically same idea in core because I investigate the Disruptor carefully. The Disruptor has other helper class, and may provide a little more functionalities. So, Disruptor has more garbage side objects.
You can think like this: You use the Disruptor to transfer the messages from one to another, the message highly possibly needs to be generated on the fly(or say, not likely pre-existed). Then, all the messages in Disruptor is garbage except primitive type values. For primitive type values/messages, landz has specialization version as my mentioned above. LongHyperLoop or IntHyperLoop are garbage free. If you truely like reuse on the fly messages, you can use pre-allocated array or pool.
in the lunch, I recall I rule out this because to consume the slot in ringbuffer demonstrate the framework behavior. One design idea of HyperLoop as a simplifed version of Disruptor's Ringbuffer, it does not care/know/control the object it transfers. Because there is no maigic: disruptor need the framework way to know the consumer have consumed the slot(this consuming process is not atmoic).
You can use one handler with the ringbuffer? or I may miss something in Ringbuffer.
The HyperLoop, originally is used to transfer offheap raw address. So, the generic version is invented later for completeness.
In some special case, you can guarantee the order of pre-allocated wrappers from your logic, then it is still ok. This is the test logic. But I know it is a pain.So, keep use Disruptor's RingBuffer before we reach more comfortable state:) very thanks!
private final ValueAdditionEventHandlerForDisruptor handler = new ValueAdditionEventHandlerForDisruptor();private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
Hi, Rajiv,
it's really appreciated you like to give the ideas out to discuss. Maybe I am not careful with Disruptor as you:)For "You can use one handler with the ringbuffer?"I mean this:this is modified from disruptor to use a pure Ringbuffer usage(but still many noises):for this example work, I need a handler, and a batchEventProcessor:private final ValueAdditionEventHandlerForDisruptor handler = new ValueAdditionEventHandlerForDisruptor();private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);The batchEventProcessor, if I am right, is the true executor or handler.The question is, where to execute the handler?The code can jump to the following:@Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }this disruptor to get thing done. Not sure you like this?but for the first statement: if (!running.compareAndSet(false, true))it is not non-full-fence any more as you know.
Can we do better with HyperLoop? As I metioned above to pre-allocated a array. From the head or tail or (every index?) of array, we can add a volatile field to indicate whether we have processed one wrapper. They become similar solutions. But we short the critical path. So, the latency may be reduced. (need to measure^_^) In both way, there may be good practices.
whether the throughput improved or not is another story. If you read all the landz's that first blog, then you got what I said.But again, I admit your case is rational. It is not good to let developer to dig out everything they need. The project should help. I will keep this in mind to see when I give your a better facilities.
for your last question, it costs about 4 months full time, if not count the first 20 days "free time", after I quit from my job.
The reason, I release a little earlier is because I told my wife "I will finish that in this year" in last year. But I really hope landz can be self-hosted by z stack before release. You may know even Netty.io is hosted via github pages, not netty by themself...The main cost is not the development time itself(70%-80%?). In fact, it is the time to review the existed works. There are two reasons for this:1. If we want to convince others with that we are better, we must provide "why".2. if we can not be better, why we just use they? but this premise is the licenses are matched.The bad thing was, most I reviewed dispointed me. This is another topic:)
You may find in the codes of landz is KISS and API prefer "composable"(or composition in design pattern). The developers of framework usually think they are the best. But this is not always true. For example, the bloating of NIO, the socket stack in linux kernel is simple for use. But NIO adds many layers high-level abstractions and encapsulations.
Is these abstractions and encapsulations right? yes or no. but this is not important. The important is that, for that want to work under low level, they can not get the private APIs, or even they get by reflection or now methodhandle, the encapsulation is not right to use.The Landz is design from scratch, and very careful for this engineering flaw. Such as:1. buffer API provide 2 layer: Buffers allow you raw operation offheap area in fastest speed without any bound chehcking. If you are smart enough, this is best. If you are not safe about this, then a higher Buffer/ByteBuffer API is provided, which has the checking to guarantee your every operation. And even in Buffer/ByteBuffer API, you can disable the with contract API if you are not-safe first but ok later.
2. The net side API, there are three layers: raw Syscall (for talking to kernel direct), sockets/epolls encapsulations for using without offHeap knowledge, higher event poll in the net module for as the best practices understood by the author of net module(me now^_^). So, if you do not agree with me, you can still create another net module but still on the top of low plumbing of landz. Or if you do not agree with offheap to talk with kernel, you still use your own way.(but note: offheap is just the only choose as I know.)
Ok, there is more works and articles will be available. Can you has interesting to write something?;) thanks again,
yes. but not necessary. zmalloc is 8 byte boundary aligned. so a allocated data chunk at least 8 bytes. you can design your data protocol to add the size header in the first of chunk. Or even, you may use the high 16bit and low 3bit of memory address to encode this info, if you know current 64abi only uses the low 48bits of full 64bits(dirty trick ).
Jin
the best of the memory allocator is that it is great like new keywords. it provides thread safe because you operate on different object\chunk. I plan to use it to drive async io pool full out of order. This is not possible for pool because it may want high throughput.
please wait hours for one unanswered question. very thank!
Jin
class MemoryCollectorRingBuffer(maxSize: Int, minSize: Int, ringBuffer: RingBuffer[NetworkEvent],sequenceBarrier: SequenceBarrier)extends ResourceCollectorRingBuffer[NetworkEvent, ByteBuffer](ringBuffer, sequenceBarrier) {private[this] val allocator = Allocators.getNewAllocator(maxSize, minSize) // This could be zmallocdef release(buffer: ByteBuffer) {assert(buffer != null)allocator.free(buffer)}// Just a method to convert your event object to the underlying dynamically sized resource you are pooling.def convert(event: NetworkEvent): ByteBuffer = {event.buffer}def getBuffer(size: Int): ByteBuffer = {if (size > maxSize || size < minSize) {null} else {// First try to allocate from the allocator.var buf = allocator.allocate(size)if (buf == null) {// If it doesn't work then try to recycle all the events processed in this cycle so far.recycle()// See if the allocation works after recycling since we should have gotten back some memory.buf = allocator.allocate(size)} else {buf}buf.clear()// The allocation could still fail. We might handle it by allocating more memory or just let the application deal with it.buf}}}The consumer is just a typical BatchProcessor. We pass to it the underlying ring buffer so it can work as expected. It doesn't need any special logic since the producer automatically collects buffers back when it marks an event processed;class NetworkEventProcessor(ringBufferNetwork: RingBuffer[NetworkEvent],val sequenceBarrier: SequenceBarrier) extends Runnable {val sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE)private[this] var readBuffer: ByteBuffer = nulloverride def run() {// No running state is kept track of for simplicity here.var nextSequence = sequence.get() + 1Lwhile (true) {var availableSequence = sequenceBarrier.waitFor(nextSequence)if (nextSequence > availableSequence) {Thread.`yield`}while (nextSequence <= availableSequence) {handleEvent(ringBufferNetwork.get(nextSequence), nextSequence)nextSequence += 1}// This will result in the producer reclaiming the resources from the right slots ultimately.sequence.set(availableSequence)}}// Process as usual.def handleEvent(event: NetworkEvent, sequence: Long) {readBuffer = event.buffer // Should we duplicate to avoid false sharing?while (readBuffer.hasRemaining()) {event.socketChannel.write(readBuffer)}}}A typical use for the producer is something like this:// Allocate a buffer to read into.val readBuffer = memoryCollectingRingBuffer.getBuffer(length);// Do something with the buffer.socketChannel.read(buffer)val index = memoryCollectingRingBuffer.next()val event = memoryCollectingRingBuffer.get(index)event.buffer = payloadevent.socketChannel = socketChannelmemoryCollectingRingBuffer.publish(index)As you can see the publisher logic is also very similar to the normal disruptor logic. The producer just calls the methods on the ResourceCollectorRingBuffer instead of calling them directly on the Disruptor RingBuffer. By keeping the resource allocator with the ring buffer, we can easily keep track of the producer and consumer cursors. Since the allocator is single threaded the logic is very simple:i) When you complete an entire cycle of a ring buffer without freeing any of the resources, you must collect all the resources from every slot back so they don't leak. Otherwise they will be overwritten by the producer and leak.ii) When you try to allocate and fail, check for all the events that have been marked processed by the consumer in this cycle. Reclaim all of them and try allocating the resource again. You have to update the cursor where the last recycle event took place so that you can detect the next full cycle properly.Does that make sense?Thanks,
Rajiv
var availableSequence = sequenceBarrier.waitFor(nextSequence)if (nextSequence > availableSequence) {Thread.`yield`}
Hi, Rajiv,It seems you mimic the logic of com.lmax.disruptor.WorkProcessor in your NetworkEventProcessor? do you consider if exception happened in handleEvent? why not re-use the Disrupt's WorkProcessor? Every statements in Disruptor has it meaning. Others:var availableSequence = sequenceBarrier.waitFor(nextSequence)if (nextSequence > availableSequence) {Thread.`yield`}sequenceBarrier.waitFor should guarantee the nextSequence is available. So the next three lines is redundant.
the problem of ringbuffer is that the slow consumer blocks all unless you allow event losing. Are you safe about socketChannel.write in handleEvent? Ok, I just point out some suspects:)Jin
On Tue, Feb 4, 2014 at 1:05 AM, Rajiv Kurian <geet...@gmail.com> wrote:
Hi Jin:Yes you are right, the handler/processor are in a separate Scala file. ResourceCollectorRingBuffer is just an abstract class. It's in The project is also a proof of concept and so the code isn't very tight. I haven't even written any tests so it could be incorrect. But the concept is right IMHO so a much tighter implementation can be coded. You pass in the ring buffer and sequence barrier to the ResourceCollectorRingBuffer. You can pass the same ring buffer to the consumer and it operates just like a normal Disruptor project. The assumption is as soon as the consumer marks a slot as processed it doesn't need the resource anymore. The abstraction exposes some of the producer methods from the ring buffer. This is an implementation (sorry for the Scala source, but it should be readable).class MemoryCollectorRingBuffer(maxSize: Int, minSize: Int, ringBuffer: RingBuffer[NetworkEvent],sequenceBarrier: SequenceBarrier)extends ResourceCollectorRingBuffer[NetworkEvent, ByteBuffer](ringBuffer, sequenceBarrier) {private[this] val allocator = Allocators.getNewAllocator(maxSize, minSize) // This could be zmallocdef release(buffer: ByteBuffer) {assert(buffer != null)allocator
--