Making my first TcpServer using reactor

720 views
Skip to first unread message

Laurent T.

unread,
Dec 16, 2013, 11:27:45 AM12/16/13
to reactor-...@googlegroups.com
Hi,

I've just started using reactor today and i've been trying to set up my first TcpServer.

My goal is to manage an HttpServer using reactor.
Here's the simple java class i made for my tests: 
https://gist.github.com/Crystark/216b1e92a1c0f4e8969e

(I based the httpServer code on this gist: https://gist.github.com/jbrisbin/7043968)

The main is set up on the basic tcpServer (which is the example shown in the wiki) but as you see i've defined an other method to try the httpServer.
Both doesn't seem to work as i thought it would.

When i execute those, i get the following output:
2013-12-16 17:24:19 INFO  Main:59 - Start!
2013-12-16 17:24:19 INFO  Main:61 - Done!

And the process is dead...

I'm sure i'm just missing something but i can't find what.
Can anyone point me in the right direction ?

Thanks alot !
Regards

Laurent.

Jon Brisbin

unread,
Dec 16, 2013, 11:38:48 AM12/16/13
to Laurent T., reactor-...@googlegroups.com
The server's start() method doesn't block since Reactor is fundamentally non-blocking. In order to keep the server alive long enough to play with it, you'll have to put some sort of while(true) { Thread.sleep(5000); } loop in or something. Or a CountDownLatch that blocks for N number of requests, etc… Otherwise your method simply completes and the server goes away.


Thanks!

Jon Brisbin | Reactor Project Lead

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Jon Brisbin

unread,
Dec 16, 2013, 2:23:53 PM12/16/13
to Laurent T., reactor-...@googlegroups.com
I've also updated the gist [1] with a little different variation on creating an HTTP server using Reactor. This uses the Stream API and transforms a request into a response which is asynchronously written to the connection.

If the highest possible throughput is more a consideration than ease of use, I would probably go with using connection.consume() and conn.send() directly (rather than using the connection.in() Stream).



Thanks!

Jon Brisbin | Reactor Project Lead

On Monday, December 16, 2013 at 10:27 AM, Laurent T. wrote:

--

Laurent T.

unread,
Dec 17, 2013, 3:54:09 AM12/17/13
to reactor-...@googlegroups.com, Laurent T.
Hi !

Thanks for your answer and indeed, this works far better:
server.start();
while (true) {
Thread.sleep(5000L);
}
I also used the new version of your gist. Thanks for updating it. I do not yet understand everything you're doing in it but i hope to do soon :)
Thanks again for your help !

Laurent T.

unread,
Jan 17, 2014, 12:08:26 PM1/17/14
to reactor-...@googlegroups.com, Laurent T.
Hi,

I'm following up on this thread as we're trying to go further than just a simple "Hello World" and are having some troubles understanding how all this works.

First, here's what we're looking to achieve in the end:
- A request comes in with it's payload
- The payload is transformed
- The transformed payload is sent to various processes asynchronously
- The various results are combined to build our HttpResponse.
- The response is sent back to the user.

Performance is really important for us and we may even choose to abandon some results if they take to much time to come in (Like it's possible to do with the poll method of a BlockingQueue).

In our dev environment, we managed to bench a simple do-nothing-just-answer-OK implementation around 50k QPS.
We then added a Thread.sleep(10) before returning the HttpResponse to simulate the time it would take to process the request.
We went down to 100 QPS wich is what is expected for sequential processing of all the requests.
I was a bit surprised as i thought requests would magically be parallelized by the TcpServer. I guess i missed something.

Could someone point us in the right direction ?
We're having a hard time finding examples on how to use Reactor so if you have any tutorial or example we'd be happy to have them.

Thanks,
Laurent

Jon Brisbin

unread,
Jan 17, 2014, 12:36:37 PM1/17/14
to Laurent T., reactor-...@googlegroups.com

On Friday, January 17, 2014 at 11:08 AM, Laurent T. wrote:

I'm following up on this thread as we're trying to go further than just a simple "Hello World" and are having some troubles understanding how all this works.

First, here's what we're looking to achieve in the end:
- A request comes in with it's payload
- The payload is transformed
- The transformed payload is sent to various processes asynchronously
- The various results are combined to build our HttpResponse.
- The response is sent back to the user.

Performance is really important for us and we may even choose to abandon some results if they take to much time to come in (Like it's possible to do with the poll method of a BlockingQueue).

You probably want to have a Reactor or Stream that uses a ThreadPoolDispatcher if you're going to be doing blocking calls. That's the only way to get magical parallelism. :) The RingBufferDisptacher, of course, is completely single-threaded and shared across all components using that same Environment, so it's very, very important to be in and out as quickly as possible. If doing a blocking call, you'll want to jump out of the event thread to a thread pool and make sure not to block on anything (by calling await() on a Promise or get() on a Future) but instead use callbacks.

Some of the additions to the 1.1 snapshot include a window() method on Stream, which sounds like maybe what you might be able to use to set timeouts easily.
 
In our dev environment, we managed to bench a simple do-nothing-just-answer-OK implementation around 50k QPS.
We then added a Thread.sleep(10) before returning the HttpResponse to simulate the time it would take to process the request.
We went down to 100 QPS wich is what is expected for sequential processing of all the requests.
I was a bit surprised as i thought requests would magically be parallelized by the TcpServer. I guess i missed something.

If using the default Dispatcher, which is the RingBufferDispatcher, requests are actually completely single-threaded across all uses of that Environment. If you created a TcpClient using the same Environment, for instance, then the RingBufferDispatcher would be shared across server *and* client. You can see how this will affect testing, so that's something to keep in mind.

Could someone point us in the right direction ?
We're having a hard time finding examples on how to use Reactor so if you have any tutorial or example we'd be happy to have them.

One can never have too many samples. :) I'll try and create one for this situation. It shouldn't take too long.

Laurent T.

unread,
Jan 20, 2014, 6:11:24 AM1/20/14
to reactor-...@googlegroups.com, Laurent T.
Hi Jon,

Thanks to you, we managed to achieve much better results.

You probably want to have a Reactor or Stream that uses a ThreadPoolDispatcher if you're going to be doing blocking calls. That's the only way to get magical parallelism. :) 

I set the following config in default.properties:
reactor.dispatchers.threadPoolExecutor.type = threadPoolExecutor
reactor.dispatchers.threadPoolExecutor.size = 0
reactor.dispatchers.threadPoolExecutor.backlog = 16384
reactor.dispatchers.default = threadPoolExecutor

After doing so i rerun our initial benchmark : we're still obtaining 50k QPS with the do-nothing-just-answer-OK test which is great. I actually thought after reading your answer that we would loose base performance switching to a threadPoolDispatcher. Shouldn't it be the case ?

Anyway, we then tested the Thread.sleep(10) and the results were much better: 800 QPS. We also did some variants of that test:
- 10ms on 10% of the requests => 8k QPS
- 50ms on 10% of the requests and 1ms on the other 90% => 1300 QPS

We do not yet know how fast we will process the data but that last one should be our worse case scenario.
Our goal here is to reach 10k QPS per server. Right now those tests are done on a dev server that has 8Gb or RAM and 8 CPU cores.
Do you have any information on how this should scale depending on the hardware ?

 If doing a blocking call, you'll want to jump out of the event thread to a thread pool and make sure not to block on anything (by calling await() on a Promise or get() on a Future) but instead use callbacks.

What do you mean by "jump out" ? Is this done by the configuration I set or should I do something more inside my Consumer or Function ?

I'm also wondering now what's the bast way to achieve the next step. Let's say i want to:
- First decode POST data to get parameters for the following tasks
- Then simultaneously, query a database and call an API
- Finally group those two results (database and API) to form an HttpResponse

In the example of TcpServer you posted, there's a consumer that then maps the stream before consuming the HttpResponse that it sends back. You added the following comment:
Use the Stream API rather than consuming messages directly.
What does it really change ?
Also, what's the difference between map and consume ? I see you can do multiple map or multiple consume one after the other.

I'm sorry for all this beginner questioning. I really want to understand how all this works to make the best choices and so i'm not surprised by any behavior.

About the window() method, i will be glad to test it once 1.1 is out. We're currently using the version deployed to maven which is 1.0.

I'm looking forward to seeing your example on this matter.
Thanks again for your help.

Laurent

Jon Brisbin

unread,
Jan 20, 2014, 9:49:37 AM1/20/14
to Laurent T., reactor-...@googlegroups.com

On Monday, January 20, 2014 at 5:11 AM, Laurent T. wrote:

I actually thought after reading your answer that we would loose base performance switching to a threadPoolDispatcher. Shouldn't it be the case ?

In general the ThreadPoolExecutorDispatcher is less performant at high volume than the RingBufferDispatcher. It produces more garbage that has to be collected, etc… But at 50k/sec I'm not sure you're going to see a huge difference. If it were 5MM/sec you'd see a noticeable difference, though. :)
 
Anyway, we then tested the Thread.sleep(10) and the results were much better: 800 QPS. We also did some variants of that test:
- 10ms on 10% of the requests => 8k QPS
- 50ms on 10% of the requests and 1ms on the other 90% => 1300 QPS

We do not yet know how fast we will process the data but that last one should be our worse case scenario.
Our goal here is to reach 10k QPS per server. Right now those tests are done on a dev server that has 8Gb or RAM and 8 CPU cores.
Do you have any information on how this should scale depending on the hardware ?

We don't actually have relative performance numbers on different types of hardware. We usually just use the benchmarks as a way to make sure that changes we make don't negatively affect the baseline but we don't use them to determine a more accurate throughput on different types of hardware.
 
 If doing a blocking call, you'll want to jump out of the event thread to a thread pool and make sure not to block on anything (by calling await() on a Promise or get() on a Future) but instead use callbacks.

What do you mean by "jump out" ? Is this done by the configuration I set or should I do something more inside my Consumer or Function ?

I just meant that if you're using the RingBufferDispatcher (either explicitly or by using the default) then you don't want to do any blocking at all since it's a single, shared thread. If you *are* doing blocking IO that's fine, you just need to make sure one of two things happens: you use the ThreadPoolExecutorDispatcher on the server so that events leave the Netty IO thread and go into your thread pool, or that you use the RingBufferDispatcher for some part of your processing but notify another Reactor that *is* using the ThreadPoolExecutorDispatcher (or you submit a job to a standard thread pool). The key is to get the blocking IO done in a thread other than the single RingBuffer thread powering the event stream.

If using the ThreadPoolExecutorDispatcher in the server because it's probably the lowest-friction route, keep in mind that every notify() will submit to the thread pool. That means if you use a Stream and have 5 steps, you'll have (potentially) 5 context switches. That can add up. It's best to be conservative on the use of thread pools and use them as infrequently as possible and only cross the thread boundary when you absolutely have to--and then only do it once or twice.

I'm also wondering now what's the bast way to achieve the next step. Let's say i want to:
- First decode POST data to get parameters for the following tasks
- Then simultaneously, query a database and call an API
- Finally group those two results (database and API) to form an HttpResponse

This sounds like a job for the as-yet-unfinished Graph API, which I think would handle this fork/join scenario a little better than Stream does, which isn't really designed to do that but be more of a linear transformation and stream processing tool.
 
In the example of TcpServer you posted, there's a consumer that then maps the stream before consuming the HttpResponse that it sends back. You added the following comment:
Use the Stream API rather than consuming messages directly.
What does it really change ?

Consuming directly on the connection skips the creation of a Stream but is less flexible. Using the Stream lets you compile a set of reusable transformations in various steps and pipe the output to the output Consumer if you wish, which makes for a very compact handler.
 
Also, what's the difference between map and consume ? I see you can do multiple map or multiple consume one after the other.

Map is a transformation function in the classic sense. It takes A and returns B. The next map() or consume() after that will take a B etc… consume() takes an A and doesn't return anything. The next step in the chain after a consume receives as input the same input given to the previous step.

Laurent T.

unread,
Jan 21, 2014, 4:40:53 AM1/21/14
to reactor-...@googlegroups.com, Laurent T.
Thanks for all those details and the accuracy of your answers.
I hope i'm not taking too much time out of you and that this topic will be useful to others.

I just meant that if you're using the RingBufferDispatcher (either explicitly or by using the default) then you don't want to do any blocking at all since it's a single, shared thread. If you *are* doing blocking IO that's fine, you just need to make sure one of two things happens: you use the ThreadPoolExecutorDispatcher on the server so that events leave the Netty IO thread and go into your thread pool, or that you use the RingBufferDispatcher for some part of your processing but notify another Reactor that *is* using the ThreadPoolExecutorDispatcher (or you submit a job to a standard thread pool). The key is to get the blocking IO done in a thread other than the single RingBuffer thread powering the event stream.

If using the ThreadPoolExecutorDispatcher in the server because it's probably the lowest-friction route, keep in mind that every notify() will submit to the thread pool. That means if you use a Stream and have 5 steps, you'll have (potentially) 5 context switches. That can add up. It's best to be conservative on the use of thread pools and use them as infrequently as possible and only cross the thread boundary when you absolutely have to--and then only do it once or twice.

I'm not sure i yet understand the concept of blocking IO vs non blocking IO and how this can affect my coding but for what i've been reading on it, it's actually quite interesting. If i understand this a bit i would say that, for instance, a database query is a blocking IO but reading POST data isn't. So let's take the case in which from time to time, we receive invalid POST data and we can know that just by looking at it and so need to answer a 400. In that case, would having a ThreadPoolExecutorDispatcher be less efficient than having a RingBufferDispatcher that answers the 400-s right away relays blocking IO-s to an other Reactor or Thread passing the TcpConnection.out() Consumer as a parameter ? 

By the way, can i create as much Reactors as i want ? What's the best way to share them with one another so they can communicate ?

Thanks again for your help in understanding reactor. You're proving invaluable.

Laurent

Jon Brisbin

unread,
Jan 21, 2014, 9:46:47 AM1/21/14
to Laurent T., reactor-...@googlegroups.com

On Tuesday, January 21, 2014 at 3:40 AM, Laurent T. wrote:

Thanks for all those details and the accuracy of your answers.
I hope i'm not taking too much time out of you and that this topic will be useful to others.

No worries! That's what we're here for...
 
I'm not sure i yet understand the concept of blocking IO vs non blocking IO and how this can affect my coding but for what i've been reading on it, it's actually quite interesting. If i understand this a bit i would say that, for instance, a database query is a blocking IO but reading POST data isn't. So let's take the case in which from time to time, we receive invalid POST data and we can know that just by looking at it and so need to answer a 400. In that case, would having a ThreadPoolExecutorDispatcher be less efficient than having a RingBufferDispatcher that answers the 400-s right away relays blocking IO-s to an other Reactor or Thread passing the TcpConnection.out() Consumer as a parameter ? 

That's a good way to look at it. But I'd say a database call is not necessarily a blocking call. :) The term "blocking" applies to any IO done (usually over a network) using the traditional networking libraries of the JDK which send bytes of data to a remote peer and then "block" the calling thread, waiting for a response. If the client sends the bytes to the peer then immediately goes on and continues to process other tasks and later picks up incoming data and matches that up with the request, then it's considered "non-blocking". Most database drivers, HTTP client libraries, and lots of other network functionality uses blocking IO. That said, there are some Netty-based clients that don't use blocking IO (like Reactor) so it's not as critical in those cases to worry about the thread in which you're operating since it won't be tied up blocking for a response.
 
By the way, can i create as much Reactors as i want ?

Reactors are designed to be lightweight enough to create a lot of them, which is what happens when you create a lot of Promises or Streams. There's a natural limit to how many are reasonable to create but I'd say that being careful about how many you create and allowed destroyed *per second* is more important than how many you can create over time. You'll probably only be affected if you get up into the 500k-1MM events/sec range, though.
 
What's the best way to share them with one another so they can communicate ?

Reactors are thread-safe so you can freely subscribe to and publish events on the same Reactor from multiple event handling threads. 

There's no built-in inter-Reactor communication presently. That's easily achieved by setting up a "select all" or "splat" Selector which matches any key and relays events over some external bus like a RabbitMQ server, Redis, etc… with a corresponding client to pull from the server and relay into a local Reactor. That's a huge topic and a little bit beyond the scope of Reactor core. We'll probably have tools to help with this fairly soon but they'll likely reside in a separate module and not be in core.
 
Thanks again for your help in understanding reactor. You're proving invaluable.

Your welcome! :) Glad to be of help…
--

Laurent T.

unread,
Jan 27, 2014, 4:24:49 AM1/27/14
to reactor-...@googlegroups.com, Laurent T.
Hi Jon,

I've been testing things for some time now and i'm coming up with a problem which is a matter of "how's it best to do it" ? And "isn't there an easier way to do it" ?

So here's what i want to achieve:
I want a simple web server that can either answer a 200 OK (everything went fine) or a 400 Bad Request (user input in error) or in some cases a 500 (unexpected error due to bad code).
The first thing i want to do is to test the URI see if it's a known route. For instance /my_controller/my_action is valid but not /not_my_controller/my_action.

I first started with two things:
- My TcpServer that's more or less just an entry point
- A Reactor to manage my events

The first thing i tried is to set a consumer on conn.in() that notifies the Reactor with the URI as the key and the connection wrapped in the event: r.notify(req.getUri(), Event.wrap(conn));
I set on my reactor the following event: r.on(Selectors.U("/my_controller/{act}", myconsummer)

Here's what i observed:
If i call /my_controller/action1 it's ok, i get what i expect: act="action1"
If i call /my_controller/action1/ it's not caught by the selector
If i call /my_controller/action1?param1=value1 i get act="action1?param1=value1" which is not what i expected
Finally i can't make an "else" consumer on my URI selectors so i do not seem to be able to catch any "non caught" URI to return a 400.

Concerning that last point, i added to my conn.in() a filter with a Predicate that matches the expected URI using a regexp. This may become tedious to maintain though as the possible actions grows.
The good thing here is I can set a Composable as an else action. i'm still not sure why i need to go though a Deferred to create that. I'm also not sure of what i should be using: A Promise or a Stream ?
I was thinking about having something like that
=> Input request OK => do stuff => prepare 200 OK response => send response
=> Input request BAD => prepare 400 Bad Request response => send response 
So i would have some kind of Function that would create a response based on a status and a content and the use conn.send() or conn.out().accept() to send it back.
BTW i'm also unsure on what's the best way to keep a reference to my TcpConnection all through the process to use it in the end. Should i build a Consumer or a Promise using that connection when a request comes in ?

Concerning the U selector that does not behave as i would expect, i'm not sure i can get around this without making my own selector. I was thinking about using a regexp selector but i may end up with a fairly complex RE and i would have to process the GET params in a later consumer. I'm not even sure using a selector here is the best thing to do. If i set the above filter, then i may as well make a simple object selector based on a String as i would already have parsed.
On the other side, if i made my own selector, it wouldn't be too hard to make an "opposite" selector to catch the "bad requests"

As a bonus question, i'd like to ask what's the best way to keep the references to the reactor(s), deferred(s) and composables(s). I think i'm quickly going to have alot of those so they can call each other and I'm not sure how i should be referencing them. Should they be passed along in tuples ? Should they be referenced statically ? Should i use the reactor as a central point and all make them accessible through events ?

Thanks a lot for your help.

Laurent

Jon Brisbin

unread,
Jan 27, 2014, 11:51:58 AM1/27/14
to Laurent T., reactor-...@googlegroups.com

I'm currently working on a Graph API that should help use cases like this which has, as part of its core values, the ability to route events through pipelines controlled by Predicates. It will also contain error handling.

What we currently lack (which would make this easier) is HTTP-specifc Request routing. Although one could conceivably use the UrlTemplateSelector as-is, I think you're finding that it's meant to be used in situations that are fairly straightforward. I had always envisioned an HttpRequestSelector that was aware of all kinds of things about a Request--the presence or absence of request params, cookies, headers, and things like that.

Combing this HttpRequestSelector with a powerful DSL for HTTP-aware routing via a Graph API should solve a lot of these kinds of problems. I'm a little short-handed at the moment as my intrepid helper is traveling this week and I'm currently trying to put together some changes for the Allocator API and make that solid for a potential 1.1.0.M1 this week. We'll see how that goes first, and then I plan on circling back around to the Graph API and how that might work with HTTP.

I'm sorry I don't have a really good answer for you yet. We have wanted from the beginning to have a good set of useful HTTP abstractions but we have so far been focussing on the fundamentals of the event-handling system and the boring underpinnings of the core framework. Now that I think I have some good tools in place for using object pools so we have less issues with GC and less overhead in general, I think some of these higher-level concerns can be focussed on a little more closely.

If you can, please stay tuned because the next couple weeks should be pretty interesting with 1.1 coming up. I can't guarantee that a full set of HTTP tools will make it into 1.1 but I'll try and make that happen if at all possible.


Thanks!

Jon Brisbin | Reactor Project Lead

Laurent T.

unread,
Jan 28, 2014, 4:19:47 AM1/28/14
to reactor-...@googlegroups.com, Laurent T.
Hi Jon,

I understand. I'm looking forward to that next milestone. Will this be pushed to maven ?

FYI, the decision has been made on our side to stick with reactor. :)
Using the TcpServer as a straightforward web server (meaning one consumer that does it all) we're achieving the same QPS as we used to with our old web server. It's still really low (3k) and we'll need to add some error handling due to the lack of routing, but we hope to evolve alongside with Reactor and use it's potential as much as we can.
So, please, keep me informed about any new release or milestone. I'll be glad to switch to it and provide my feedback.

We'll be starting with a simple TcpServerSpec using a threadPoolExecutor with a size at 0 (auto) and a backlog at 16384 (kinda arbitrary).
We'll be using the NettyHttpServerSocketOptions you defined in your gist.
We'll have one consumer that will : parse the incoming request, send it to our controller (which is blocking) then build the response based on the result from the controller.

If you have any advice on how to improve/fine tune this simple case, we'll be glad to hear them. Our first goal in the future will be to achieve 10k QPS and 10k simultaneous connections.
Meanwhile we'll be looking forward to your advances.

What would be the best way to follow the updates on the reactor project ?

Thanks!
Laurent

Laurent T.

unread,
Apr 23, 2014, 8:54:39 AM4/23/14
to reactor-...@googlegroups.com, Laurent T.
Hi Jon,

I see that you've done quite some work since last time. We're currently changing from the "old" TcpServer to the new NetServer.
I've been reading your example here: http://spring.io/guides/gs/reactor-thumbnailer/
It's really nice to see that kind of guides being published. We're neither using Spring nor Java 8 but it wasn't hard to adapt.

We're currently going to do something very simple but i had some questions as we'll need to make this evolve in the future.
First do you have a way to address the case in which we'd like to answer the HttpRequest immediately if the processing takes more than X millis ?
Also in your  thumbnailer example, what happens to a request that does not correspond to any of the filters ? How would you do a general "else" ? 
Also as your example is bound to spring, i think i'm missing some of the configuration: How would I specify the size or the backlog of an executor ?

Thanks

songya...@gmail.com

unread,
Jun 14, 2015, 4:00:37 AM6/14/15
to reactor-...@googlegroups.com, jbri...@gopivotal.com, lau.t...@gmail.com
Hi Jon,

Do you have a sample for this scenario posted?  I did a search online and have not found one.

In my case, the results should not be collected but be written to HttpResponse as they happen.  This should be the nature way of efficient streaming, but HTTPServletResponse and most notably spring's DeferredResult only allows one setResult() operation, which defeats the very idea of async NIO.  Anyway, I'm desperately looking for a sample code.

Thanks,
Reply all
Reply to author
Forward
0 new messages