Is there an example that shows how to implement TcpServer/TcpClient in 2.0.3.RELEASE? The code from the reference guide Asynchronous TCP, UDP and HTTP doesn’t work unfortunately. I had to remove take(10) from the client stream definition to get it to compile, but it still doesn’t send the data over the wire. It looks like there’s something missing. This code:
package com.harris.servicegateway;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import reactor.Environment;
import reactor.io.buffer.Buffer;
import reactor.io.codec.json.JsonCodec;
import reactor.io.net.NetStreams;
import reactor.io.net.tcp.TcpClient;
import reactor.io.net.tcp.TcpServer;
import reactor.io.net.tcp.support.SocketUtils;
import reactor.rx.Streams;
public class ServerTest {
@Test
public void serverTest() throws Exception {
Environment.initializeIfEmpty();
CountDownLatch latch = new CountDownLatch(10);
int port = SocketUtils.findAvailableTcpPort();
TcpServer<Buffer, Buffer> server = NetStreams.tcpServer(port);
TcpClient<Buffer, Buffer> client = NetStreams.tcpClient("localhost", port);
final JsonCodec<Pojo, Pojo> codec = new JsonCodec<Pojo, Pojo>(Pojo.class);
//the client/server are prepared
server.start( input ->
//for each connection echo any incoming data
//return the write confirm publisher from writeWith
// >>> close when the write confirm completed
input.writeWith(
//read incoming data
input
.decode(codec) //transform Buffer into Pojo
.log("serve")
.map(codec) //transform Pojo into Buffer
.capacity(5l) //auto-flush every 5 elements
)
).await();
client.start( input -> {
//read 10 replies and close
input
// .take(10)
.decode(codec)
.log("receive")
.consume( data -> latch.countDown() );
//write data
input.writeWith(
Streams.range(1, 10)
.map( it -> new Pojo("test" + it) )
.log("send")
.map(codec)
);
//keep-alive, until 10 data have been read
return Streams.never();
}).await();
latch.await(10, TimeUnit.SECONDS);
client.shutdown().await();
server.shutdown().await();
}
public static class Pojo {
private String name;
private Pojo() {
}
private Pojo(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Pojo{" +
"name='" + name + '\'' +
'}';
}
}
}
Produces the following output:
14:25:02.185 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
14:25:02.188 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true
14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 8
14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false
14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available
14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false
14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable
14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes. Please check the configuration for better performance.
14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/9t/j_w7lv7d7hvg14gcpdpfgmm40000gn/T (java.io.tmpdir)
14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
14:25:02.209 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false
14:25:02.210 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512
14:25:02.227 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 8
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 8
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
14:25:02.388 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0xdea71fd913a21250 (took 5 ms)
14:25:02.402 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: unpooled
14:25:02.402 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536
14:25:02.403 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1)
14:25:02.403 [main] DEBUG io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128 (non-existent)
14:25:02.412 [reactor-tcp-select-2] INFO r.i.n.impl.netty.tcp.NettyTcpServer - BIND /127.0.0.1:37421
14:25:02.423 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86] REGISTERED
14:25:02.424 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86] CONNECT(localhost/127.0.0.1:37421, null)
14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - CONNECT [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421]
14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] REGISTERED
14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] ACTIVE
14:25:02.427 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 => localhost/127.0.0.1:37421] ACTIVE
14:25:02.432 [reactor-tcp-io-6] INFO serve - subscribe: MapAction
14:25:02.432 [reactor-tcp-io-14] INFO receive - subscribe: ConsumerAction
14:25:02.433 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 => localhost/127.0.0.1:37421] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@10c07f35
14:25:02.433 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@595d24d2
14:25:02.434 [reactor-tcp-io-14] INFO receive - onSubscribe: reactor.rx.stream.io.DecoderStream$2$1@43f67bd5
14:25:02.434 [reactor-tcp-io-6] INFO serve - onSubscribe: reactor.rx.stream.io.DecoderStream$2$1@3d3d3d59
14:25:02.434 [reactor-tcp-io-6] INFO serve - request: 5
14:25:02.435 [reactor-tcp-io-14] INFO receive - request: 9223372036854775807
14:25:12.447 [reactor-tcp-io-6] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
14:25:12.451 [reactor-tcp-io-6] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacity.default: 262144
14:25:12.459 [reactor-tcp-io-6] DEBUG r.i.n.i.n.NettyChannelHandlerBridge - Cancel connection
14:25:12.459 [reactor-tcp-io-6] INFO serve - cancel
14:25:12.460 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 :> /127.0.0.1:37421] INACTIVE
14:25:12.460 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 :> /127.0.0.1:37421] UNREGISTERED
14:25:12.537 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 :> localhost/127.0.0.1:37421] INACTIVE
14:25:12.537 [reactor-tcp-io-14] INFO receive - complete
14:25:12.537 [reactor-tcp-io-14] INFO receive - cancel
14:25:12.537 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 :> localhost/127.0.0.1:37421] UNREGISTERED
For some reason the client never sends the data to the server.