TcpServer/TcpClient example

528 views
Skip to first unread message

Alexander Baiko

unread,
Jun 15, 2015, 2:40:56 PM6/15/15
to reactor-...@googlegroups.com

Is there an example that shows how to implement TcpServer/TcpClient in 2.0.3.RELEASE? The code from the reference guide   Asynchronous TCP, UDP and HTTP doesn’t work unfortunately. I had to remove take(10) from the client stream definition to get it to compile, but it still doesn’t send the data over the wire. It looks like there’s something missing. This code:

package com.harris.servicegateway;


import java.util.concurrent.CountDownLatch;

import java.util.concurrent.TimeUnit;


import org.junit.Test;


import reactor.Environment;

import reactor.io.buffer.Buffer;

import reactor.io.codec.json.JsonCodec;

import reactor.io.net.NetStreams;

import reactor.io.net.tcp.TcpClient;

import reactor.io.net.tcp.TcpServer;

import reactor.io.net.tcp.support.SocketUtils;

import reactor.rx.Streams;



public class ServerTest {

@Test

public void serverTest() throws Exception {

Environment.initializeIfEmpty();

CountDownLatch latch = new CountDownLatch(10);


int port = SocketUtils.findAvailableTcpPort();

TcpServer<Buffer, Buffer> server = NetStreams.tcpServer(port);

TcpClient<Buffer, Buffer> client = NetStreams.tcpClient("localhost", port);


final JsonCodec<Pojo, Pojo> codec = new JsonCodec<Pojo, Pojo>(Pojo.class);


//the client/server are prepared

server.start( input ->


        //for each connection echo any incoming data


        //return the write confirm publisher from writeWith

        // >>> close when the write confirm completed


        input.writeWith(


                //read incoming data

                input

                        .decode(codec) //transform Buffer into Pojo

                        .log("serve")

                        .map(codec)    //transform Pojo into Buffer

                        .capacity(5l)  //auto-flush every 5 elements

        )

).await();


client.start( input -> {


        //read 10 replies and close

        input

//                 .take(10)

                .decode(codec)

                .log("receive")

                .consume( data -> latch.countDown() );


        //write data

        input.writeWith(

                Streams.range(1, 10)

                        .map( it -> new Pojo("test" + it) )

                        .log("send")

                        .map(codec)

        );


        //keep-alive, until 10 data have been read

        return Streams.never();


}).await();


latch.await(10, TimeUnit.SECONDS);


client.shutdown().await();

server.shutdown().await();

}


public static class Pojo {

private String name;


private Pojo() {

}


private Pojo(String name) {

this.name = name;

}


public String getName() {

return name;

}


public void setName(String name) {

this.name = name;

}


@Override

public String toString() {

return "Pojo{" +

"name='" + name + '\'' +

'}';

}

}

}


Produces the following output:

14:25:02.185 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework

14:25:02.188 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16

14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available

14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available

14:25:02.195 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available

14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true

14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 8

14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false

14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available

14:25:02.196 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false

14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: unavailable

14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - You don't have Javassist in your class path or you don't have enough permission to load dynamically generated classes.  Please check the configuration for better performance.

14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.tmpdir: /var/folders/9t/j_w7lv7d7hvg14gcpdpfgmm40000gn/T (java.io.tmpdir)

14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)

14:25:02.197 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false

14:25:02.209 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false

14:25:02.210 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512

14:25:02.227 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 8

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 8

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.tinyCacheSize: 512

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768

14:25:02.228 [main] DEBUG i.n.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192

14:25:02.388 [main] DEBUG i.n.util.internal.ThreadLocalRandom - -Dio.netty.initialSeedUniquifier: 0xdea71fd913a21250 (took 5 ms)

14:25:02.402 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: unpooled

14:25:02.402 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 65536

14:25:02.403 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1)

14:25:02.403 [main] DEBUG io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128 (non-existent)

14:25:02.412 [reactor-tcp-select-2] INFO  r.i.n.impl.netty.tcp.NettyTcpServer - BIND /127.0.0.1:37421

14:25:02.423 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86] REGISTERED

14:25:02.424 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86] CONNECT(localhost/127.0.0.1:37421, null)

14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - CONNECT [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421]

14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] REGISTERED

14:25:02.426 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] ACTIVE

14:25:02.427 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 => localhost/127.0.0.1:37421] ACTIVE

14:25:02.432 [reactor-tcp-io-6] INFO  serve - subscribe: MapAction

14:25:02.432 [reactor-tcp-io-14] INFO  receive - subscribe: ConsumerAction

14:25:02.433 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 => localhost/127.0.0.1:37421] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@10c07f35

14:25:02.433 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 => /127.0.0.1:37421] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@595d24d2

14:25:02.434 [reactor-tcp-io-14] INFO  receive - onSubscribe: reactor.rx.stream.io.DecoderStream$2$1@43f67bd5

14:25:02.434 [reactor-tcp-io-6] INFO  serve - onSubscribe: reactor.rx.stream.io.DecoderStream$2$1@3d3d3d59

14:25:02.434 [reactor-tcp-io-6] INFO  serve - request: 5

14:25:02.435 [reactor-tcp-io-14] INFO  receive - request: 9223372036854775807

14:25:12.447 [reactor-tcp-io-6] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple

14:25:12.451 [reactor-tcp-io-6] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacity.default: 262144

14:25:12.459 [reactor-tcp-io-6] DEBUG r.i.n.i.n.NettyChannelHandlerBridge - Cancel connection

14:25:12.459 [reactor-tcp-io-6] INFO  serve - cancel

14:25:12.460 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 :> /127.0.0.1:37421] INACTIVE

14:25:12.460 [reactor-tcp-io-6] DEBUG r.i.n.impl.netty.tcp.NettyTcpServer - [id: 0x8dac81c2, /127.0.0.1:56543 :> /127.0.0.1:37421] UNREGISTERED

14:25:12.537 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 :> localhost/127.0.0.1:37421] INACTIVE

14:25:12.537 [reactor-tcp-io-14] INFO  receive - complete

14:25:12.537 [reactor-tcp-io-14] INFO  receive - cancel

14:25:12.537 [reactor-tcp-io-14] DEBUG r.i.n.impl.netty.tcp.NettyTcpClient - [id: 0x0508cc86, /127.0.0.1:56543 :> localhost/127.0.0.1:37421] UNREGISTERED


For some reason the client never sends the data to the server.

Reply all
Reply to author
Forward
0 new messages