2.0.1-RELEASE HttpServer, POST implementation

174 views
Skip to first unread message

tjrea...@gmail.com

unread,
May 4, 2015, 5:50:48 AM5/4/15
to reactor-...@googlegroups.com
I am trying to update an existing TcpServer<FullHttpRequest, FullHttpResponse> implementation in 2.0.0-RELEASE to an HttpServer<byte[], byte[]> implementation
in 2.0.1-RELEASE.

I noticed that HttpServer<FullHttpRequest, FullHttpResponse> in 2.0.1-RELEASE does not play nicely with a NettyServerSocketOption.pipeline. I encountered a host of codec related issues, hence the move to <byte[], byte[]> (I am also trying to learn and embrace the Streams paradigm and finding it a particularly steep learning curve)

Now to my problem, I can't get my head around how to get to the POST data in a POST request in order to do something with it (I tried "consuming" it, but that is terminal and seems to go nowhere). I am guessing I have to dispatch it or something (but where to? and what will pick it up?)

Any "patterns" or methods useful in handling request / response type situations (TCP and HTTP) and where the response is going to go out the door, (rather that stay in process) with the new release would be greatly appreciated.

Here is what I've done so far;

private void setupServer() throws InterruptedException {
    httpServer = NetStreams.httpServer(server -> server.codec(StandardCodecs.BYTE_ARRAY_CODEC).listen(1201).dispatcher(Environment.sharedDispatcher()));
    httpServer.get("/get/{name}", getHandler());
    httpServer.post("/post", postHandler());
    httpServer.start().awaitSuccess();
}

ReactorChannelHandler<byte[], byte[], HttpChannel<byte[], byte[]>> getHandler() {
    return channel -> {
        channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(), entry.getValue())));
        channel.params().entrySet().forEach(entry -> System.out.println(String.format("params [%s=>%s]", entry.getKey(), entry.getValue())));
        StringBuilder resp = new StringBuilder().append("hello ").append(channel.params().get("name"));
        return channel.writeWith(Streams.just(resp.toString().getBytes()));
    };
}

ReactorChannelHandler<byte[], byte[], HttpChannel<byte[], byte[]>> postHandler() {
    return channel -> {
        channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(), entry.getValue())));
        // TODO: How do I get to the IN, do something and put that onto the OUT
    };
}

Keep up the great work, also some info on what use cases the different dispatchers would fit best would be useful (shared, cached, etc. etc.)

Regards

Tim

Stephane Maldini

unread,
May 4, 2015, 11:29:03 AM5/4/15
to tjrea...@gmail.com, reactor-framework
Awesome, I didn't put a public announce yet to let me prepare more of these examples somewhere.
Well the channel is a Stream of incoming data where you can subscribe() or use any methods (.consume() etc) but the best is just to look at a suite of examples from the spec and the tests:




--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Stephane Maldini | Solutions Architect, CSO EMEA | London | Pivotal

tjrea...@gmail.com

unread,
May 4, 2015, 4:07:46 PM5/4/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
Ok, can't get a POST to return (GETs are fine), I've sent the POST raw and via an Apache HTTP client. The client just hangs waiting for the response as if the peer steam is not flushed or closed. The POSTed data is available in the Publisher<Buffer> but the response never gets to the client. Not sure where I am going wrong, but it is probably in my understanding of the Stream.

Any help appreciated. Test can be found below;

Best regards

Tim

package tests;

import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URLEncoder;
import java.nio.ByteBuffer;

import org.apache.http.HttpException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;

import reactor.Environment;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpServer;
import reactor.io.net.impl.netty.NettyServerSocketOptions;
import reactor.rx.Streams;

public class SimpleHttpRequestResponse {
    private HttpServer<Buffer, Buffer> httpServer;

    @Before
    public void setup() throws InterruptedException {
        Environment.initializeIfEmpty().assignErrorJournal();
        setupServer();
    }

    private ServerSocketOptions serverSocketOptions() {
        return new NettyServerSocketOptions().pipelineConfigurer(pipeline -> pipeline.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(16 * 1024 * 1024)));

    }

    private void setupServer() throws InterruptedException {
        httpServer = NetStreams.httpServer(server -> server.codec(StandardCodecs.PASS_THROUGH_CODEC).listen(1201).dispatcher(Environment.sharedDispatcher()));

        httpServer.get("/get/{name}", getHandler());
        httpServer.post("/post", postHandler());
        httpServer.start().awaitSuccess();
    }

    ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> getHandler() {
        return channel -> {
            channel.headers().entries().forEach(entry1 -> System.out.println(String.format("header [%s=>%s]", entry1.getKey(), entry1.getValue())));
            channel.params().entrySet().forEach(entry2 -> System.out.println(String.format("params [%s=>%s]", entry2.getKey(), entry2.getValue())));
            StringBuilder response = new StringBuilder().append("hello ").append(channel.params().get("name"));
            System.out.println(String.format("%s from thread %s", response.toString(), Thread.currentThread()));
            return channel.writeWith(Streams.just(Buffer.wrap(response.toString())));
        };
    }

    ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> postHandler() {

        return channel -> {
            channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(), entry.getValue())));
            return channel.writeWith(Streams.wrap(channel).log("received").flatMap(new Function<Buffer, Publisher<Buffer>>() {
                @Override
                public Publisher<Buffer> apply(Buffer data) {
                    final StringBuilder response = new StringBuilder().append("hello ").append(new String(data.asBytes()));
                    System.out.println(String.format("%s from thread %s", response.toString(), Thread.currentThread()));
                    return Streams.just(Buffer.wrap(response.toString()));
                }
            }));
        };
    }

    @After
    public void teardown() {
        httpServer.shutdown().get();
    }

    @Test
    public void tryBoth() throws InterruptedException, IOException, HttpException {
        get("/get/joe", httpServer.getListenAddress());
        post("/post", URLEncoder.encode("pete", "UTF8"), httpServer.getListenAddress());
    }

    private void get(String path, SocketAddress address) {
        try {
            StringBuilder request = new StringBuilder().append(String.format("GET %s HTTP/1.1\r\n", path)).append("Connection: Keep-Alive\r\n").append("\r\n");
            java.nio.channels.SocketChannel channel = java.nio.channels.SocketChannel.open(address);
            System.out.println(String.format("get: request >> [%s]", request.toString()));
            channel.write(Buffer.wrap(request.toString()).byteBuffer());
            ByteBuffer buf = ByteBuffer.allocate(4 * 1024);
            while (channel.read(buf) > -1)
                ;
            String response = new String(buf.array());
            System.out.println(String.format("get: << Response: %s", response));
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void post(String path, String data, SocketAddress address) {
        try {
            StringBuilder request = new StringBuilder().append(String.format("POST %s HTTP/1.1\r\n", path)).append("Connection: Keep-Alive\r\n");
            request.append(String.format("Content-Length: %s\r\n", data.length())).append("\r\n").append(data).append("\r\n");
            java.nio.channels.SocketChannel channel = java.nio.channels.SocketChannel.open(address);
            System.out.println(String.format("post: request >> [%s]", request.toString()));
            channel.write(Buffer.wrap(request.toString()).byteBuffer());
            ByteBuffer buf = ByteBuffer.allocate(4 * 1024);
            while (channel.read(buf) > -1)
                ;
            String response = new String(buf.array());
            System.out.println(String.format("post: << Response: %s", response));
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();

Stephane Maldini

unread,
May 4, 2015, 5:42:37 PM5/4/15
to tjrea...@gmail.com, reactor-framework
Thanks for this, will try to add that on the test suite. Any chance you can add reactor.io.net in DEBUG with your logger (must be bridged to slf4j to display packets).

Stephane Maldini

unread,
May 4, 2015, 5:52:55 PM5/4/15
to tjrea...@gmail.com, reactor-framework
BTW this is something I haven't really thought of:


    private ServerSocketOptions serverSocketOptions() {
        return new NettyServerSocketOptions().pipelineConfigurer(pipeline -> pipeline.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(16 * 1024 * 1024)));
    }


The HttpServer adds the codec already, but I haven't tried with the aggregator.

Streams.wrap(channel) -> channel (its a Stream already).

In fact I would try this:


    ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> postHandler() {
        return channel -> {
            channel.headers().entries().forEach(entry -> System.out.println(String.format("header [%s=>%s]", entry.getKey(), entry.getValue())));
           
 /* writeWith flush reply on complete, take(1) will complete after 1 incoming data
            writeWith returns a Stream<Void> which is complete after the flush is successful
            this stream is used to signal when to close (returned in the channelHandler)
            therefore the flow is -> request -> take 1 data -> map and write -> complete -> flush -> close */
            return channel.writeWith(
               channel
                  .log("received")
                  .take(1) // how many times we want to consume this post
                  .map( input -> "hello" + input.asString() )
                  .log("replying")
            );
        };
    }

tjrea...@gmail.com

unread,
May 5, 2015, 3:01:50 AM5/5/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
Many thanks Stephane, 

I will all try this today and let you know. BTW, adding the aggregator causes issues, so I left it off the spec in the example (it was just hanging around from my previous TcpServer implementation of a HTTP server). 

Also I tried replacing the 

Streams.wrap(channel) with a Streams.wrap(bufferStream) as per the SmokeTest but that produced the same symptom.

Best regards

Tim

tjrea...@gmail.com

unread,
May 5, 2015, 5:48:04 AM5/5/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
I have tried several ways of a handling of POST, there is one based on the SmokeTests that does a channel.writeWith on stream based on a processor. This handler works intermittently, (it will always work at least once but only 1-N times when a POST is sent in a loop, typically hanging on the 3rd iteration). The other handlers always block but way down in the netty stuff. I will try and debug a little further, the test class is below; 

Also, I see this with DEBUG on and a successful response;
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 => /127.0.1.1:1201] WRITE(5B)
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 0d 0a 0d 0a                                  |0....           |
+--------+-------------------------------------------------+----------------+
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 => /127.0.1.1:1201] FLUSH
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 => /127.0.1.1:1201] FLUSH
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 => /127.0.1.1:1201] CLOSE()
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 => /127.0.1.1:1201] CLOSE()
05 May 2015 10:39:07 DEBUG NettyChannelHandlerBridge - Cancel connection
05 May 2015 10:39:07 DEBUG NettyChannelHandlerBridge - Cancel connection
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 :> /127.0.1.1:1201] INACTIVE
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 :> /127.0.1.1:1201] INACTIVE
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 :> /127.0.1.1:1201] UNREGISTERED
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0xc4038aa5, /127.0.0.1:46702 :> /127.0.1.1:1201] UNREGISTERED
======================================================================================================
post: << Response: HTTP/1.1 200 OKTransfer-Encoding: chunkedbhello sam-10

But this is the last lines in the logs when a request hangs;

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 54 72 61 6e 73 66 65 72 2d 45 6e 63 6f 64 69 |.Transfer-Encodi|
|00000020| 6e 67 3a 20 63 68 75 6e 6b 65 64 0d 0a 0d 0a    |ng: chunked.... |
+--------+-------------------------------------------------+----------------+
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0x0bbaf944, /127.0.0.1:46703 => /127.0.1.1:1201] FLUSH
05 May 2015 10:39:07 DEBUG NettyHttpServer - [id: 0x0bbaf944, /127.0.0.1:46703 => /127.0.1.1:1201] FLUSH
05 May 2015 10:39:07  INFO stream-post - subscribe: TakeAction-{{dispatcher=immediate, max-capacity=infinite}}{take=1, counted=0}
05 May 2015 10:39:07  INFO processor - onNext: java.nio.DirectByteBuffer[pos=0 lim=5 cap=5]
05 May 2015 10:39:07  INFO stream-post - onSubscribe: reactor.core.processor.RingBufferWorkProcessor$RingBufferSubscription@1ada716d
buffer [sam-2] from thread Thread[RingBufferProcessor-1,5,main]
05 May 2015 10:39:07  INFO stream-post - request: 1
05 May 2015 10:39:07  INFO processor - request: 9223372036854775807


Hope this helps;

Best regards

Tim

package tests;

import java.io.IOException;
import java.net.SocketAddress;
import java.net.URLEncoder;
import java.nio.ByteBuffer;

import org.apache.http.HttpException;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;

import reactor.Environment;
import reactor.core.processor.RingBufferProcessor;
import reactor.core.processor.RingBufferWorkProcessor;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.StandardCodecs;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpServer;
import reactor.rx.Stream;
import reactor.rx.Streams;

public class SimpleHttpRequestResponse {
private HttpServer<Buffer, Buffer> httpServer;

private Processor<Buffer, Buffer> processor;
private RingBufferWorkProcessor<Buffer> workProcessor;
private Stream<Buffer> bufferStream;

@BeforeSuite
public void setup() throws InterruptedException {
System.out.println("before suite");
Environment.initializeIfEmpty().assignErrorJournal();

processor = RingBufferProcessor.create(false);
workProcessor = RingBufferWorkProcessor.create(false);
bufferStream = Streams.wrap(processor).log("processor").flatMap(b -> {
System.out.println(String.format("buffer [%s] from thread %s", new String(b.asBytes()), Thread.currentThread()));
final StringBuilder response = new StringBuilder().append("hello ").append(new String(b.asBytes()));
return Streams.just(Buffer.wrap(response.toString()));
}).process(workProcessor);

setupServer();
}

@AfterSuite
public void teardown() {
System.out.println("after suite");
httpServer.shutdown().get();
}

private void setupServer() throws InterruptedException {
httpServer = NetStreams.httpServer(server -> server.codec(StandardCodecs.PASS_THROUGH_CODEC).listen(1201).dispatcher(Environment.sharedDispatcher()));
httpServer.get("/get/{name}", getHandler());
httpServer.post("/stream-post", succeedsAtLeastOnceHandler());
httpServer.post("/channel-post", channelPostHandler());
httpServer.post("/channel-post-blocks", alwaysBlocksHandler());
httpServer.start().awaitSuccess();
}

ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> getHandler() {
return channel -> {
channel.headers().entries().forEach(entry1 -> System.out.println(String.format("header [%s=>%s]", entry1.getKey(), entry1.getValue())));
channel.params().entrySet().forEach(entry2 -> System.out.println(String.format("params [%s=>%s]", entry2.getKey(), entry2.getValue())));
StringBuilder response = new StringBuilder().append("hello ").append(channel.params().get("name"));
System.out.println(String.format("%s from thread %s", response.toString(), Thread.currentThread()));
return channel.writeWith(Streams.just(Buffer.wrap(response.toString())));
};
}

ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> succeedsAtLeastOnceHandler() {
return channel -> {
channel.consume(b -> processor.onNext(b));
return channel.writeWith(bufferStream.log("stream-post").take(1).flatMap(new Function<Buffer, Publisher<Buffer>>() {
@Override
public Publisher<Buffer> apply(Buffer data) {
System.out.println(String.format("stream-post received [%s] from thread %s", new String(data.asBytes()), Thread.currentThread()));
return Streams.just(data);
}
}));
};
}

ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> channelPostHandler() {
return channel -> {
return channel.writeWith(channel.log("channel-post").take(1).flatMap(data -> {
System.out.println(String.format("channel-post received [%s] from thread %s", new String(data.asBytes()), Thread.currentThread()));
final StringBuilder response = new StringBuilder().append("hello ").append(new String(data.asBytes()));
return Streams.just(Buffer.wrap(response.toString()));
}));
};
}
ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> alwaysBlocksHandler() {
return channel -> {
return channel.writeWith(channel.log("channel-post").take(1).map(input -> Buffer.wrap("hello" + input.asString())).log("channel-post-replying"));
};
}

@Test(enabled = false)
public void testGetInLoop() throws InterruptedException, IOException, HttpException {
for (int i = 0; i < 10; i++) {
System.out.println("======================================================================================================");
get(String.format("/get/joe-%d", i), httpServer.getListenAddress());
}
}

@Test(enabled = false)
public void testPostWithProcessor() throws InterruptedException, IOException, HttpException {
/* This post always returns */
System.out.println("======================================================================================================");
post("/stream-post", URLEncoder.encode("pete", "UTF8"), httpServer.getListenAddress());
}

@Test
public void testPostInLoop() throws InterruptedException, IOException, HttpException {
/* These posts will eventually block after 1-N attempts */
for (int i = 0; i < 5; i++) {
System.out.println("======================================================================================================");
post("/stream-post", URLEncoder.encode(String.format("sam-%d", i), "UTF8"), httpServer.getListenAddress());
}
}

@Test(enabled = false)
public void testChannelHandler() throws InterruptedException, IOException, HttpException {
System.out.println("======================================================================================================");
post("/channel-post", URLEncoder.encode("charlie", "UTF8"), httpServer.getListenAddress());
}
@Test(enabled = false)
public void testChannelPostAlwaysBlocks() throws InterruptedException, IOException, HttpException {
/* Post on this handler always blocks and never returns */
System.out.println("======================================================================================================");
post("/channel-post-blocks", URLEncoder.encode("john", "UTF8"), httpServer.getListenAddress());
}

private void get(String path, SocketAddress address) {
try {
StringBuilder request = new StringBuilder().append(String.format("GET %s HTTP/1.1\r\n", path)).append("Connection: Keep-Alive\r\n").append("\r\n");
java.nio.channels.SocketChannel channel = java.nio.channels.SocketChannel.open(address);
System.out.println(String.format("get: request >> [%s]", request.toString()));
channel.write(Buffer.wrap(request.toString()).byteBuffer());
ByteBuffer buf = ByteBuffer.allocate(4 * 1024);
while (channel.read(buf) > -1)
;
String response = new String(buf.array());
System.out.println("======================================================================================================");
System.out.println(String.format("get: << Response: %s", response.replaceAll("\r", "").replaceAll("\n", "")));
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}

private void post(String path, String data, SocketAddress address) {
try {
StringBuilder request = new StringBuilder().append(String.format("POST %s HTTP/1.1\r\n", path)).append("Connection: Keep-Alive\r\n");
request.append(String.format("Content-Length: %s\r\n", data.length())).append("\r\n").append(data).append("\r\n");
java.nio.channels.SocketChannel channel = java.nio.channels.SocketChannel.open(address);
System.out.println(String.format("post: request >> [%s]", request.toString()));
channel.write(Buffer.wrap(request.toString()).byteBuffer());
ByteBuffer buf = ByteBuffer.allocate(4 * 1024);
while (channel.read(buf) > -1)
;
String response = new String(buf.array()).replaceAll("\r", "").replaceAll("\n", "");
System.out.println("======================================================================================================");
System.out.println(String.format("post: << Response: %s", response));
channel.close();
Assert.assertTrue(response.contains("200") && response.contains(data));

tjrea...@gmail.com

unread,
May 5, 2015, 9:44:32 AM5/5/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
So my two cents worth, it seems;

05 May 2015 14:19:12 DEBUG NettyHttpServer - [id: 0x5d1a423b, /127.0.0.1:50459 => /127.0.1.1:1201] CLOSE()

and 

05 May 2015 14:19:12 DEBUG NettyChannelHandlerBridge - Cancel connection

and anything else that happens before, which may finalise response processing, is invoked when you pass anything other than a Streams.just(buffer)  to channel.writeWith()

What I am trying to say is, it seems the channel write lifecycle does not complete with anything else other than a Streams.just(buffer).

Will try and narrow it down, but in the meantime will have to go back to my 2.0.0-RELEASE HTTP implementation.

Best regards

Tim
...

tjrea...@gmail.com

unread,
May 5, 2015, 10:30:30 AM5/5/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
Problem manifests in NettyChannelHandlerBridge

With channel.writeWith(Streams.just(Buffer.wrap(response.toString()))), NettyChannelHandlerBridge.write(ctx, msg, promise)  eventually receives an EmptyLastHttpContent, which invokes super.write(ctx, msg, promise) and completes the writing.

With channel.writeWith(channel.log("channel-post").take(1).map(input -> Buffer.wrap("hello " + input.asString()))) an EmptyLastHttpContent is never sent and super.write(ctx, msg, promise) never invoked.

Hope this makes sense;

Best regards

Tim



Stephane Maldini

unread,
May 7, 2015, 6:51:00 AM5/7/15
to tjrea...@gmail.com, reactor-framework
Sorry don't feel let down actually I;m trying to come up with a guide (I'm iterating over this one: https://drone.io/github.com/reactor/reactor/files/build/reference/pdf/reactor-reference.pdf) and this will certainly be part of it. As soon thats ready and I finish my customer duties and I fire an announce for 2.0.1.RELEASE I give you an update, don't hold your breath but that probably means I'll come back to ML question next week.

--
You received this message because you are subscribed to the Google Groups "reactor-framework" group.
To unsubscribe from this group and stop receiving emails from it, send an email to reactor-framew...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

tjrea...@gmail.com

unread,
May 7, 2015, 7:21:25 AM5/7/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
No worries, gone back to what I had implemented with 2.0.0-RELEASE

Best regards

Tim

tjrea...@gmail.com

unread,
May 26, 2015, 3:39:55 AM5/26/15
to reactor-...@googlegroups.com
HttpServer post is still broken in 2.0.2-RELEASE.

Best regards

Tim

Stephane Maldini

unread,
May 26, 2015, 5:14:09 AM5/26/15
to tjrea...@gmail.com, reactor-framework
Hey Tim
any chance you can connect to https://gitter.im/reactor/reactor to sort that out ?

Cheers

Stephane Maldini

unread,
May 26, 2015, 5:18:35 AM5/26/15
to tjrea...@gmail.com, reactor-framework
Meanwhile what's the latest version of your test code ?

Stephane Maldini

unread,
May 26, 2015, 6:18:16 AM5/26/15
to tjrea...@gmail.com, reactor-framework
Added your test in the suite here

And pushed a fix, if you can try 2.1.0.BUILD-SNAPSHOT, if ok it will make it to 2.0.3

tjrea...@gmail.com

unread,
May 26, 2015, 7:42:33 AM5/26/15
to reactor-...@googlegroups.com, tjrea...@gmail.com
Hey Stephane,

Cool thanks. I will hop onto that later today.

Best regards

Tim

tjrea...@gmail.com

unread,
May 27, 2015, 3:05:25 AM5/27/15
to reactor-...@googlegroups.com
Nice Stephane,

The test you included was the last test I had done and it now works with 2.1.0.BUILD-SNAPSHOT, so sorted.

The documentation is also looking good, but I was just wondering about processing "patterns" for servers implemented in the manner.

Previously I was handing off the actual work to be done to an EventBus.sendAndReceive with selectors that handled the same incoming server Uri. Is this still valid or perhaps unnecessary?

I was wondering if I should simply handle the work to be done in the ReactorChannelHandler assigned to the incoming Uri, or farm it out to a RingBufferWorkProcessor or such in the handler?

Basically I am just trying to get to an efficient Request / Reply paradigm to maximise throughput.

Best regards

Tim
Reply all
Reply to author
Forward
0 new messages