The project looks really interesting

84 views
Skip to first unread message

Rajiv Kurian

unread,
Feb 1, 2014, 2:11:50 AM2/1/14
to la...@googlegroups.com
I bounced off your post on the mechanical sympathy list.

I read the page at http://landz.github.io/ to figure out the aim for the project. Would it be correct to say landz aims at providing more of a library approach without prescribing a style? The users are free to mix and match components like the ring-buffer/queue implementations, general purpose memory allocator, the network and IO facilities to build their own applications?

Excited to see how the project shapes up! Thanks for sharing.


Jin Mingjian

unread,
Feb 1, 2014, 3:12:29 AM2/1/14
to Rajiv Kurian, la...@googlegroups.com

thanks, Rajiv, the link is right. it is my mistake to forget to attach the  url. I am just recovered from a cold, and in a half-vacation state. but i will in partial time.

--
You received this message because you are subscribed to the Google Groups "landz" group.
To unsubscribe from this group and stop receiving emails from it, send an email to landz+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Jin Mingjian

unread,
Feb 1, 2014, 3:21:22 AM2/1/14
to Rajiv Kurian, la...@googlegroups.com

yes, the user is free to use any kind of combinations of existed good practices with minimum effort and full control of his choosen.

2014-2-1 下午3:11于 "Rajiv Kurian" <geet...@gmail.com>写道:

Rajiv Kurian

unread,
Feb 1, 2014, 3:32:17 AM2/1/14
to la...@googlegroups.com, Rajiv Kurian
I've been going through the source and there is a lot of stuff to see.

The style of the code seems a very C style dealing in primitives wherever possible instead of objects. I see you are handing out addresses (longs) from alloc instead of byte buffers etc. This seems great for perf, though I wonder how different it is from writing C/C++ straight up ;)

Another thing that I noted was your ring buffer implementation is fulfilling more of a queue interface. It accepts new T objects instead of reusing new ones. How does that compare to the disruptor where you get an old slot and just change the values on an old object? The queue approach seems like it would create more garbage.

Have you measured Java's NIO performance VS your wrapper over the epoll syscalls? Curious to know what in the Java implementation turned you off? Are the JNI overheads justified for the extra perf gain?

I am sure I'll have more questions as I read the source. Thanks again!

Rajiv Kurian

unread,
Feb 1, 2014, 3:33:35 AM2/1/14
to la...@googlegroups.com, Rajiv Kurian


On Saturday, February 1, 2014 12:32:17 AM UTC-8, Rajiv Kurian wrote:
I've been going through the source and there is a lot of stuff to see.

The style of the code seems a very C style dealing in primitives wherever possible instead of objects. I see you are handing out addresses (longs) from alloc instead of byte buffers etc. This seems great for perf, though I wonder how different it is from writing C/C++ straight up ;)

Another thing that I noted was your ring buffer implementation is fulfilling more of a queue interface. It accepts new T objects instead of reusing new ones. How does that compare to the disruptor where you get an old slot and just change the values on an old object? The queue approach seems like it would create more garbage.
meant to say "instead of reusing existing ones"

Jin Mingjian

unread,
Feb 1, 2014, 7:06:36 AM2/1/14
to Rajiv Kurian, la...@googlegroups.com
Hi, Rajiv, you are almost there:) thanks for feedback. In fact, this design is to return control of object to the user. That is, the user decides where the sent object is from. You can cache/pre-allocated your objects. If you want send primitive type value, you can use LongHyperLoop or IntHyperLoop. The slot (of internal buffer) is needed in both Disruptor and HyperLoop(RingBUffer). The Disruptor use a setVaule(Object) to populate the slots. In fact, they are the both basically same idea in core because I investigate the Disruptor carefully.   The Disruptor has other helper class, and may provide a little more functionalities. So, Disruptor has more garbage side objects. 

You can think like this:  You use the Disruptor to transfer the messages from one to another, the message highly possibly needs to be generated on the fly(or say, not likely pre-existed). Then, all the messages in Disruptor is garbage except primitive type values. For primitive type values/messages, landz has specialization version as my mentioned above. LongHyperLoop or IntHyperLoop are garbage free. If you truely like reuse on the fly messages, you can use pre-allocated array or pool. 

Jin 


Jin Mingjian

unread,
Feb 1, 2014, 7:43:58 AM2/1/14
to Rajiv Kurian, la...@googlegroups.com
there is a simple benchmark for syscall in some side of socket here;

...
 test_sys_socket: spawn 50000 socket costs 46,085,626 nanos with last sockfd 50025
 test_net_socket: spawn 50000 socket costs 54,567,699 nanos with last sock 100025
 test_xnio_socket: spawn 50000 socket costs 46,118,654 nanos with last sock 150026
...

 znr syscall is a slightly faster than xnio(jni based), and  20% faster than jdk shipped nio's code(20% is a little large in my thought, ~5% may be contributed by methodhandle, then the rest 15% due to what?)

As you said "epoll syscall"...If you mean syscall, then the above is enough. For epoll itself, there are many kind of usages. And I am so landz's usage is much different to NIO's. So, direct comparison is not meanful. But, finally, I will add more benchmarks and I am considering to join the one 3rd benchmark after landz's self-hosted. But before, I am self-confident with the result:)

Jin

Rajiv Kurian

unread,
Feb 1, 2014, 1:25:31 PM2/1/14
to la...@googlegroups.com, Rajiv Kurian


On Saturday, February 1, 2014 4:06:36 AM UTC-8, Jin Mingjian wrote:
Hi, Rajiv, you are almost there:) thanks for feedback. In fact, this design is to return control of object to the user. That is, the user decides where the sent object is from. You can cache/pre-allocated your objects. If you want send primitive type value, you can use LongHyperLoop or IntHyperLoop. The slot (of internal buffer) is needed in both Disruptor and HyperLoop(RingBUffer). The Disruptor use a setVaule(Object) to populate the slots. In fact, they are the both basically same idea in core because I investigate the Disruptor carefully.   The Disruptor has other helper class, and may provide a little more functionalities. So, Disruptor has more garbage side objects.
 
 

You can think like this:  You use the Disruptor to transfer the messages from one to another, the message highly possibly needs to be generated on the fly(or say, not likely pre-existed). Then, all the messages in Disruptor is garbage except primitive type values. For primitive type values/messages, landz has specialization version as my mentioned above. LongHyperLoop or IntHyperLoop are garbage free. If you truely like reuse on the fly messages, you can use pre-allocated array or pool. 

The disruptor will work without any allocation as long as the classes themselves are composed of primitives or other classes that are composed of a static number of primitives. For example if you had a class like:
class SampleEvent {
  long a,b,c; 
  Container d;

}

class Container {
  int a, b, c;
  byte msgId;
}

The problem arises when you have events with a dynamically sized members. For example: ByteBuffer, String etc. And in that case you have to implement your own pooling. The get method provides a perfect interface to reclaim objects back on the producer side and put them back on the pool. This means your pool can be single threaded and completely on the producer side, where it should be.

You say that users can implement their own pooling as desired, but the only interface I see to get objects back from the GenericHyperLoop is receive, tryReceive etc. These methods update the cursor and need to be called on the consumer thread, which means that if you want to put these objects back on a pool, your pool needs to be multi-threaded with getObject called on the producer thread and releaseObject called on the consumer thread. In the Disruptor when the consumer says that they have processed an entry this information will eventually reach the producer through the cursor info. The producer can use this information to reclaim objects on it's own thread without an additional multi-threaded pool implementation. Example: https://github.com/RajivKurian/Java-NIO-example/blob/master/src/main/java/com/rajiv/rbutils/ResourceCollectorRingBuffer.java

Maybe I am missing something though.

Jin Mingjian

unread,
Feb 1, 2014, 9:13:37 PM2/1/14
to Rajiv Kurian, la...@googlegroups.com
I get your pure-primitives-usage. You are right. The current GenericHyperLoop does not keep transferred objects. It could be add a object-retained version by add a factory constructor argument like Disruptor.

There are several workaround in Landz:

1. use ThreadLocalPool (https://github.com/landz/z-stack/blob/master/landz.kernel/src/z/util/concurrent/ThreadLocalPool.java) for the wrappers thread-safe sharing. Because, in your case, the Event and Container are as a wrapper to primitive playload. But this cause: 1. the storage size to be linear with thread number. 2. add an indirection penalty. not very good.

2. other usage is use LongHyperLoop to transfer the pointer(address) of playload on offheap. This is zero-garbage, But it is still to add a indirection penalty, although It is useful in its cases.

3. use consumer-to-productor channels, ok, just joke:)

do you like to try the object-retained version implementation? I can add for you today:) or do you have interests to hack a version for landz?:)

very thanks, 

Jin




Jin Mingjian

unread,
Feb 1, 2014, 9:23:15 PM2/1/14
to Rajiv Kurian, la...@googlegroups.com
sorry, addtional constructor arg, is not enough, but we can do a new class to allow ti access the insider of the slot.

Jin Mingjian

unread,
Feb 2, 2014, 1:09:33 AM2/2/14
to Rajiv Kurian, la...@googlegroups.com
in the lunch, I recall I rule out  this because to consume the slot in ringbuffer demonstrate the framework behavior. One design idea of HyperLoop as a simplifed version of Disruptor's Ringbuffer, it does not care/know/control the object it transfers. Because there is no maigic: disruptor need the framework way to know the consumer have consumed the slot(this consuming process is not atmoic).

You can use one handler with the ringbuffer? or I may miss something in Ringbuffer. 

The HyperLoop, originally is used to transfer offheap raw address. So, the generic version is invented later for completeness.

In some special case, you can guarantee the order of pre-allocated wrappers from your logic, then it is still ok. This is the test logic. But I know it is a pain.

So, keep use Disruptor's RingBuffer before we reach more comfortable state:) very thanks!

Jin



Rajiv Kurian

unread,
Feb 2, 2014, 1:29:30 AM2/2/14
to la...@googlegroups.com, Rajiv Kurian


On Saturday, February 1, 2014 10:09:33 PM UTC-8, Jin Mingjian wrote:
in the lunch, I recall I rule out  this because to consume the slot in ringbuffer demonstrate the framework behavior. One design idea of HyperLoop as a simplifed version of Disruptor's Ringbuffer, it does not care/know/control the object it transfers. Because there is no maigic: disruptor need the framework way to know the consumer have consumed the slot(this consuming process is not atmoic).
Yup no magic at all. The nice thing is that the knowledge that something has been consumed is noted just by incrementing the consumer cursor. Even the HyperLoop has to do this. But separating the act of updating the consumer cursor and getting the event lets consumers have an intermediate processing stage that lets the producer thread automatically reclaim dynamic resources and add it to a pool.

You can use one handler with the ringbuffer? or I may miss something in Ringbuffer. 
Hmm - you can have multiple consumers. They all go through the entire array instead of claiming one slot at a time though. 

The HyperLoop, originally is used to transfer offheap raw address. So, the generic version is invented later for completeness.
Yeah that makes sense. But again when you need to reclaim these addresses, the disruptor's model of separating the getEvent and updateCursor will help the producer reclaim these buffers back without any multi-threaded pooling implementation. Of course this is not always desired especially if you want to hold onto the buffer beyond the scope of initial processing. In that case though you can just have a multi-threaded pool implementation. The point being this model lets you be flexible.

In some special case, you can guarantee the order of pre-allocated wrappers from your logic, then it is still ok. This is the test logic. But I know it is a pain.

So, keep use Disruptor's RingBuffer before we reach more comfortable state:) very thanks!
Thank you Jin. I am really excited about the Landz project. If you don't mind me asking, how long have you been working on this? 

Jin Mingjian

unread,
Feb 2, 2014, 3:35:41 AM2/2/14
to Rajiv Kurian, la...@googlegroups.com
Hi, Rajiv, 

it's really appreciated you like to give the ideas out to discuss. Maybe I am not careful with Disruptor as you:) 

For "You can use one handler with the ringbuffer?"

I mean this:


this is modified from disruptor to use a pure Ringbuffer usage(but still many noises):

for this example work, I need a handler, and a batchEventProcessor:

 private final ValueAdditionEventHandlerForDisruptor handler = new ValueAdditionEventHandlerForDisruptor();
 private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    
The batchEventProcessor, if I am right, is the true executor or handler.

The question is, where to execute the handler?

The code can jump to the following:

    @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }

this disruptor to get thing done. Not sure you like this?

but for the first statement: if (!running.compareAndSet(false, true))

it is not non-full-fence any more as you know.


Can we do better with HyperLoop?  As I metioned above to pre-allocated a array. From the head or tail or (every index?) of array, we can add a volatile field to indicate whether we have processed one wrapper. They become similar solutions. But we short the critical path. So, the latency may be reduced. (need to measure^_^) In both way, there may be good practices.

whether the throughput improved or not is another story. If you read all the landz's that first blog, then you got what I said.

But again, I admit your case is rational. It is not good to let developer to dig out everything they need. The project should help. I will keep this in mind to see when I give your a better facilities.

very thanks,
Jin






























Jin Mingjian

unread,
Feb 2, 2014, 4:18:56 AM2/2/14
to Rajiv Kurian, la...@googlegroups.com
for your last question, it costs about 4 months full time, if not count the first 20 days "free time", after I quit from my job. 

The reason, I release a little earlier is because I told my wife "I will finish that in this year" in last year. But I really hope landz can be self-hosted by z stack before release. You may know even Netty.io is hosted via github pages, not netty by themself...

The main cost is not the development time itself(70%-80%?). In fact, it is the time to review the existed works. There are two reasons for this:

1.  If we want to convince others with that we are better, we must provide "why".

2. if we can not be better, why we just use they? but this premise is the licenses are matched.

The bad thing was, most I reviewed dispointed me. This is another topic:)

You may find in the codes of landz is KISS and API prefer "composable"(or composition in design pattern). The developers of framework usually think they are the best. But this is not always true. For example, the bloating of NIO, the socket stack in linux kernel is simple for use. But NIO adds many layers high-level abstractions and encapsulations. 

Is these  abstractions and encapsulations right? yes or no. but this is not important. The important is that, for that want to work under low level, they can not get the private APIs, or even they get by reflection or now methodhandle, the encapsulation is not right to use.

The Landz is design from scratch, and very careful for this engineering flaw. Such as: 

1. buffer API provide 2 layer: Buffers allow you raw operation offheap area in fastest speed without any bound chehcking. If you are smart enough, this is best. If you are not safe about this, then a higher Buffer/ByteBuffer API is provided, which has the checking to guarantee your every operation. And even in Buffer/ByteBuffer API, you can disable the with contract API if you are not-safe first but ok later.

2. The net side API, there are three layers: raw Syscall (for talking to kernel direct), sockets/epolls encapsulations for using without offHeap knowledge, higher event poll in the net module for as the best practices understood by the author of net module(me now^_^). So, if you do not agree with me, you can still create another net module but still on the top of low plumbing of landz. Or if you do not agree with offheap to talk with kernel, you still use your own way.(but note: offheap is just the only choose as I know.)

Ok, there is more works and articles will be available. Can you has interesting to write something?;) thanks again,

Jin




Rajiv Kurian

unread,
Feb 2, 2014, 1:05:56 PM2/2/14
to la...@googlegroups.com, Rajiv Kurian


On Sunday, February 2, 2014 12:35:41 AM UTC-8, Jin Mingjian wrote:
Hi, Rajiv, 

it's really appreciated you like to give the ideas out to discuss. Maybe I am not careful with Disruptor as you:) 

For "You can use one handler with the ringbuffer?"

I mean this:


this is modified from disruptor to use a pure Ringbuffer usage(but still many noises):

for this example work, I need a handler, and a batchEventProcessor:

 private final ValueAdditionEventHandlerForDisruptor handler = new ValueAdditionEventHandlerForDisruptor();
 private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, sequenceBarrier, handler);
    
The batchEventProcessor, if I am right, is the true executor or handler.

The question is, where to execute the handler?

The code can jump to the following:

    @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { // if previous sequence was processed - fetch the next sequence and set // that we have successfully processed the previous sequence // typically, this will be true // this prevents the sequence getting too far forward if an exception // is thrown from the WorkHandler if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { event = ringBuffer.get(nextSequence); workHandler.onEvent(event); processedSequence = true; } else { cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { // handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); }

this disruptor to get thing done. Not sure you like this?

but for the first statement: if (!running.compareAndSet(false, true))

it is not non-full-fence any more as you know.


Can we do better with HyperLoop?  As I metioned above to pre-allocated a array. From the head or tail or (every index?) of array, we can add a volatile field to indicate whether we have processed one wrapper. They become similar solutions. But we short the critical path. So, the latency may be reduced. (need to measure^_^) In both way, there may be good practices.


I'm not too sure I understand your question :) 

whether the throughput improved or not is another story. If you read all the landz's that first blog, then you got what I said.

But again, I admit your case is rational. It is not good to let developer to dig out everything they need. The project should help. I will keep this in mind to see when I give your a better facilities.
Sweet! Yeah my point is that piggybacking off the information that an event has been processed by a consumer lets an implementation do more things. That information is already there in HyperLoop, it's a matter of exposing it to the developer. 

Rajiv Kurian

unread,
Feb 2, 2014, 1:41:07 PM2/2/14
to la...@googlegroups.com, Rajiv Kurian


On Sunday, February 2, 2014 1:18:56 AM UTC-8, Jin Mingjian wrote:
for your last question, it costs about 4 months full time, if not count the first 20 days "free time", after I quit from my job. 
Thats awesome. I've wanted to take a break and build something awesome too :) Congratulations for following up and working through it! 

The reason, I release a little earlier is because I told my wife "I will finish that in this year" in last year. But I really hope landz can be self-hosted by z stack before release. You may know even Netty.io is hosted via github pages, not netty by themself...

The main cost is not the development time itself(70%-80%?). In fact, it is the time to review the existed works. There are two reasons for this:

1.  If we want to convince others with that we are better, we must provide "why".

2. if we can not be better, why we just use they? but this premise is the licenses are matched.

The bad thing was, most I reviewed dispointed me. This is another topic:)
I definitely look forward to more articles on what you found and how you came to make certain design decisions.  

You may find in the codes of landz is KISS and API prefer "composable"(or composition in design pattern). The developers of framework usually think they are the best. But this is not always true. For example, the bloating of NIO, the socket stack in linux kernel is simple for use. But NIO adds many layers high-level abstractions and encapsulations.
Yeah the composable APIs are the easiest to work with. 
 

Is these  abstractions and encapsulations right? yes or no. but this is not important. The important is that, for that want to work under low level, they can not get the private APIs, or even they get by reflection or now methodhandle, the encapsulation is not right to use.

The Landz is design from scratch, and very careful for this engineering flaw. Such as: 

1. buffer API provide 2 layer: Buffers allow you raw operation offheap area in fastest speed without any bound chehcking. If you are smart enough, this is best. If you are not safe about this, then a higher Buffer/ByteBuffer API is provided, which has the checking to guarantee your every operation. And even in Buffer/ByteBuffer API, you can disable the with contract API if you are not-safe first but ok later.
This style is really great. I've always been a fan of libraries exposing their innards so that the user can decide what level they want to operate on. If a library is a HTTP library, it should expose the HTTP parser as a function that takes a pointer to a buffer + size without making any assumptions on how the buffer was generated etc. It can then provide a networking layer and even a threading layer as a convenience on top of this basic functionality. But users should be free to make decisions on what networking layer to use, and what threading layer to use etc and still be able to profit from the HTTP parser functionality.

2. The net side API, there are three layers: raw Syscall (for talking to kernel direct), sockets/epolls encapsulations for using without offHeap knowledge, higher event poll in the net module for as the best practices understood by the author of net module(me now^_^). So, if you do not agree with me, you can still create another net module but still on the top of low plumbing of landz. Or if you do not agree with offheap to talk with kernel, you still use your own way.(but note: offheap is just the only choose as I know.)
That's really awesome. Building on one's own abstractions lets a developer choose which level they want to operate on. 

Ok, there is more works and articles will be available. Can you has interesting to write something?;) thanks again,
I've explored a bit of the code for now. Once I get around to writing something even trivial with the different pieces, I'll try to write something. There is a lot of surface area, but the code is very well organized and easy to read, especially thanks to the timely comments. So thank you for that. My bit of a pessimistic thought though is that writing in this style is more tedious in Java than C/C++. Unsafe is more tedious than just using C++. A lot of the utility functions like findNextAlignedPointer etc are already present in C++. You also have posix_memalign etc. Syscalls don't have to go through JNI, though this might be moot given how expensive syscalls already are. Having structs makes it less verbose to work with raw memory too. But I get the utility of the package. Java developers using your library can be blissfully unaware of the underlying complexities and still profit from the carefully architected code.

The one topic I am curious to get your thoughts on is building an ecosystem of libraries around the core. Mundane things like a library processing the Twitter API will have to be built upon your Buffer, ByteBuffer API and will need to be specialized quite a bit. Not a big deal but it seems like a new ecosystem will need to be built around the APIs.

Rajiv Kurian

unread,
Feb 2, 2014, 1:52:52 PM2/2/14
to la...@googlegroups.com, Rajiv Kurian
Also wanted to say that I like the specializations with long. It gets around the indirection you'd need with wrapper objects which is great.

You said you wanted to use it to transfer buffers. Won't you need a long(address) + an int(size) to transfer a workable buffer? Unless the size is the same for all of them.

Jin Mingjian

unread,
Feb 2, 2014, 9:12:50 PM2/2/14
to Rajiv Kurian, la...@googlegroups.com

yes. but not necessary. zmalloc is 8 byte boundary aligned. so a allocated data chunk at least 8 bytes. you can design your data protocol to add the size header in the first of chunk. Or even, you may use the high 16bit and low 3bit of memory address to encode this info, if you know current 64abi only uses the low 48bits of full 64bits(dirty trick ).

Jin

Jin Mingjian

unread,
Feb 2, 2014, 9:19:48 PM2/2/14
to Rajiv Kurian, la...@googlegroups.com

the best of the memory allocator is that it is great like new keywords. it provides thread safe because you operate on different object\chunk. I plan to use it to drive async io pool full out of order. This is not possible for pool because it may want high throughput.

please wait hours for one unanswered question. very thank!

Jin

2014-2-3 上午2:52于 "Rajiv Kurian" <geet...@gmail.com>写道:

Jin Mingjian

unread,
Feb 3, 2014, 9:47:01 AM2/3/14
to Rajiv Kurian, la...@googlegroups.com
sorry for unclear about this example and question: I see your ResourceCollectorRingBuffer.java does not use any handler/processor which are shown in the Disrutpor's own test, so I am curious about how exactly you use that RingBuffer. If not use  handler/processor in Disruptor, I am interesting to how you guarantee the safety of consuming the slot/value.

I do not go though your project. It seems in a your spare time project? If you are not urgent, I can see if I can help in some future because there are still many high priority for landz:) thanks again!

For ecosystem problem, I have not considered it now:) I consider firstly to solid the existed works and some small new features which is necessary in the backend side. I hope to if the potential mate like to join the game:) 

Jin

Rajiv Kurian

unread,
Feb 3, 2014, 12:05:33 PM2/3/14
to la...@googlegroups.com, Rajiv Kurian
Hi Jin:

Yes you are right, the handler/processor are in a separate Scala file. ResourceCollectorRingBuffer is just an abstract class. It's in The project is also a proof of concept and so the code isn't very tight. I haven't even written any tests so it could be incorrect. But the concept is right IMHO so a much tighter implementation can be coded. You pass in the ring buffer and sequence barrier to the ResourceCollectorRingBuffer. You can pass the same ring buffer to the consumer and it operates just like a normal Disruptor project. The assumption is as soon as the consumer marks a slot as processed it doesn't need the resource anymore. The abstraction exposes some of the producer methods from the ring buffer. This is an implementation (sorry for the Scala source, but it should be readable).

class MemoryCollectorRingBuffer(maxSize: Int, minSize: Int, ringBuffer: RingBuffer[NetworkEvent],
                                sequenceBarrier: SequenceBarrier)
  extends ResourceCollectorRingBuffer[NetworkEvent, ByteBuffer](ringBuffer, sequenceBarrier) {

  private[this] val allocator = Allocators.getNewAllocator(maxSize, minSize) // This could be zmalloc

  def release(buffer: ByteBuffer) {
    assert(buffer != null)
    allocator.free(buffer)
  }

// Just a method to convert your event object to the underlying dynamically sized resource you are pooling.
  def convert(event: NetworkEvent): ByteBuffer = {
    event.buffer
  }

  def getBuffer(size: Int): ByteBuffer = {
    if (size > maxSize || size < minSize) {
      null
    } else {
// First try to allocate from the allocator.
      var buf = allocator.allocate(size)
      if (buf == null) {
// If it doesn't work then try to recycle all the events processed in this cycle so far.
        recycle()
// See if the allocation works after recycling since we should have gotten back some memory.
        buf = allocator.allocate(size)
      } else {
        buf
      }
      buf.clear()
// The allocation could still fail. We might handle it by allocating more memory or just let the application deal with it.
      buf
    }
  }
}

The consumer is just a typical BatchProcessor. We pass to it the underlying ring buffer so it can work as expected. It doesn't need any special logic since the producer automatically collects buffers back when it marks an event processed;
class NetworkEventProcessor(ringBufferNetwork: RingBuffer[NetworkEvent],
                            val sequenceBarrier: SequenceBarrier) extends Runnable {
  val sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE)
  private[this] var readBuffer: ByteBuffer = null

  override def run() {
// No running state is kept track of for simplicity here.
    var nextSequence = sequence.get() + 1L

    while (true) {
      var availableSequence = sequenceBarrier.waitFor(nextSequence)
      if (nextSequence > availableSequence) {
        Thread.`yield`
      }
      while (nextSequence <= availableSequence) {
        handleEvent(ringBufferNetwork.get(nextSequence), nextSequence)
        nextSequence += 1
      }
// This will result in the producer reclaiming the resources from the right slots ultimately.
      sequence.set(availableSequence)
    }
  }
// Process as usual.
  def handleEvent(event: NetworkEvent, sequence: Long) {
    readBuffer = event.buffer // Should we duplicate to avoid false sharing?

    while (readBuffer.hasRemaining()) {
      event.socketChannel.write(readBuffer)
    }
  }
}

A typical use for the producer is something like this:

// Allocate a buffer to read into.
val readBuffer = memoryCollectingRingBuffer.getBuffer(length);
// Do something with the buffer.
socketChannel.read(buffer)
val index = memoryCollectingRingBuffer.next()
val event = memoryCollectingRingBuffer.get(index)
event.buffer = payload
event.socketChannel = socketChannel
memoryCollectingRingBuffer.publish(index)

As you can see the publisher logic is also very similar to the normal disruptor logic. The producer just calls the methods on the ResourceCollectorRingBuffer instead of calling them directly on the Disruptor RingBuffer. By keeping the resource allocator with the ring buffer, we can easily keep track of the producer and consumer cursors. Since the allocator is single threaded the logic is very simple:
 i) When you complete an entire cycle of a ring buffer without freeing any of the resources, you must collect all the resources from every slot back so they don't leak. Otherwise they will be overwritten by the producer and leak.
ii) When you try to allocate and fail, check for all the events that have been marked processed by the consumer in this cycle. Reclaim all of them and try allocating the resource again. You have to update the cursor where the last recycle event took place so that you can detect the next full cycle properly.

Does that make sense?

Thanks,
Rajiv

Jin Mingjian

unread,
Feb 4, 2014, 2:10:54 AM2/4/14
to Rajiv Kurian, la...@googlegroups.com
Hi, Rajiv, 

It seems you mimic the logic of com.lmax.disruptor.WorkProcessor in your NetworkEventProcessor? do you consider if exception happened in handleEvent? why not re-use the Disrupt's WorkProcessor? Every statements in Disruptor has it meaning. Others:

      var availableSequence = sequenceBarrier.waitFor(nextSequence)
      if (nextSequence > availableSequence) {
        Thread.`yield`
      }
sequenceBarrier.waitFor should guarantee the nextSequence is available. So the next three lines is redundant.

the problem of ringbuffer is that the slow consumer blocks all unless you allow event losing. Are you safe about socketChannel.write in handleEvent? Ok, I just point out some suspects:)

Jin

Rajiv Kurian

unread,
Feb 4, 2014, 2:19:58 AM2/4/14
to Jin Mingjian, la...@googlegroups.com
Hey Jin,

Like I said this is a simplified example as a proof of concept, not a real implementation. I can use the BatchEventProcessor straight up without writing anything of my own. You can safely ignore the event processing code and use the disruptor WorkProcessor. There is nothing special here. Yup socketChannel.write can cause exceptions and not everything could be written at one go either. All of that is secondary though. No special processing logic is needed. The thing I wanted to show you is that the producer can reclaim resources on its own leading to a single threaded pool implementation. Does that make sense?

On Monday, February 3, 2014, Jin Mingjian <jin...@gmail.com> wrote:
Hi, Rajiv, 

It seems you mimic the logic of com.lmax.disruptor.WorkProcessor in your NetworkEventProcessor? do you consider if exception happened in handleEvent? why not re-use the Disrupt's WorkProcessor? Every statements in Disruptor has it meaning. Others:

      var availableSequence = sequenceBarrier.waitFor(nextSequence)
      if (nextSequence > availableSequence) {
        Thread.`yield`
      }
sequenceBarrier.waitFor should guarantee the nextSequence is available. So the next three lines is redundant.

the problem of ringbuffer is that the slow consumer blocks all unless you allow event losing. Are you safe about socketChannel.write in handleEvent? Ok, I just point out some suspects:)

Jin
On Tue, Feb 4, 2014 at 1:05 AM, Rajiv Kurian <geet...@gmail.com> wrote:
Hi Jin:

Yes you are right, the handler/processor are in a separate Scala file. ResourceCollectorRingBuffer is just an abstract class. It's in The project is also a proof of concept and so the code isn't very tight. I haven't even written any tests so it could be incorrect. But the concept is right IMHO so a much tighter implementation can be coded. You pass in the ring buffer and sequence barrier to the ResourceCollectorRingBuffer. You can pass the same ring buffer to the consumer and it operates just like a normal Disruptor project. The assumption is as soon as the consumer marks a slot as processed it doesn't need the resource anymore. The abstraction exposes some of the producer methods from the ring buffer. This is an implementation (sorry for the Scala source, but it should be readable).

class MemoryCollectorRingBuffer(maxSize: Int, minSize: Int, ringBuffer: RingBuffer[NetworkEvent],
                                sequenceBarrier: SequenceBarrier)
  extends ResourceCollectorRingBuffer[NetworkEvent, ByteBuffer](ringBuffer, sequenceBarrier) {

  private[this] val allocator = Allocators.getNewAllocator(maxSize, minSize) // This could be zmalloc

  def release(buffer: ByteBuffer) {
    assert(buffer != null)
    allocator

--

Jin Mingjian

unread,
Feb 4, 2014, 8:14:03 AM2/4/14
to Rajiv Kurian, la...@googlegroups.com
Rajiv, Make sense:) 

Rajiv Kurian

unread,
Feb 4, 2014, 10:37:32 AM2/4/14
to Jin Mingjian, la...@googlegroups.com
Cool. I'll keep exploring the source. Thank you for the project. You've given me a lot of reading material :)

Jin Mingjian

unread,
Feb 5, 2014, 1:46:45 AM2/5/14
to Rajiv Kurian, la...@googlegroups.com
Rajiv, Keep rolling! There will be more articles in the near future after the official blog being self-hosted. We definitely should voice in wider community.

best regards,
Jin
Reply all
Reply to author
Forward
0 new messages