Stream-Multiplexer with netty

301 views
Skip to first unread message

u6f6o

unread,
Apr 18, 2017, 4:48:29 AM4/18/17
to Netty discussions
Hi, 

I am currently working on a little service that listens to a streaming service, transforms the messages to some other format, applies some business logic and distributes the results among a bunch of connected websocket clients. 

So far I created two netty Bootstraps, one that listens to the streaming service for incoming messages and one that handles the connected websocket clients: 

public class TickSource {

private static final byte FS = 28;

public void run() throws Exception {
String host = "localhost";
int port = 20077;

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(81920, Unpooled.wrappedBuffer(new byte[]{FS})));
ch.pipeline().addLast(new TickSourceHandler());
}
});
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
and 
public class TickDistributor {

public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(final SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new HttpRequestDecoder(),
new HttpObjectAggregator(65536),
new HttpResponseEncoder(),
new WebSocketServerProtocolHandler("/ticks"),
new TickDistributorHandler()
);
}
}
);
Channel ch = sb.bind(8080).sync().channel();
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

So each component on its own works nicely. The problem is, I don't have a clue how to connect them both in a nice netty-way. I thought about storing the the ServerChannels in the TickSourceHandler and write a message received from the streaming service to each of them individually, not sure though, if this is achievable in that way or if there is a nicer way to do it with netty mechanics. 

Reading through Netty In Action I also read about LocalChannels and bootstrapping new client connections in the same EventLoop if you have a 1->1 relation, but I guess it's not suitable in my case with a relation of 1->n? 

I'd be happy about any hint!

Thx, 
u6f6o

Rogan Dawes

unread,
Apr 18, 2017, 5:11:24 AM4/18/17
to Netty discussions
Take a look at the various proxy options?

I have also implemented a mux with netty in the USaBUSe code here:


Perhaps it will give you some ideas.

Also look at ./src/main/java/io/netty/example/securechat/SecureChatServerHandler.java, as it contains an example of distributing messages between a number of channels.

Rogan


--
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/4cad9bbf-52b6-4b0f-8705-d57902453c07%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

u6f6o

unread,
Apr 18, 2017, 8:53:17 AM4/18/17
to Netty discussions, ro...@dawes.za.net
Thx for the hint! I got a first prototype ready (code can be found below). I am a bit unsure about some one aspects though: Both the Bootstrap and ServerBootstrap work on the same worker group. Is it okay doing it like this or would it make more sense to let the client have its own worker group? 

In case any no-go can be found in my code (expect the separate thread to wait on closeFuture), I'd be happy about hints/comments. 

public class StreamingServer {

private static final int PORT = Integer.parseInt(System.getProperty("port", "8992"));
    private static final byte FS = 28;

    public static void main(String[] args) throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
SslContext sslCtx = SslContextBuilder.forServer(
ssc.certificate(),
ssc.privateKey()
).build();

ChannelGroup connectedClients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

String host = "localhost";
int port = 20077;

        try {
Bootstrap source = new Bootstrap();
source.group(workerGroup);
source.channel(NioSocketChannel.class);
source.option(ChannelOption.SO_KEEPALIVE, true);
source.handler(new ChannelInitializer<SocketChannel>() {

@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(81920, Unpooled.wrappedBuffer(new byte[]{FS})));
                    ch.pipeline().addLast(new TickSourceHandler(connectedClients));
}
});
ChannelFuture f = source.connect(host, port).sync();
new Thread(){
@Override
public void run() {
try {
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();

ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new StreamingServerInitializer(
sslCtx,
connectedClients));

server.bind(PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

Rogan Dawes

unread,
Apr 19, 2017, 2:15:27 AM4/19/17
to Netty discussions, ro...@dawes.za.net
Your Thread doesn't really appear to be doing anything, other than waiting for the closeFuture() to sync(), and then exits. That serves no purpose at all, that I can see.

Your two bootstraps are also not "related" to each other. i.e. they are not connected to each other or communicating with each other in any way.

If you take a look at the various *Proxy* examples in the netty tree, you will see that after a connection is received, an Initializer is used to set up the various handlers on the pipeline. Most typically, one of the handlers will construct the client Bootstrap, establish the outbound connection, store a reference to the Channel somewhere (possibly by passing it as a constructor parameter of another handler), and then remove itself from the pipeline, so that additional outbound connections are not made.

I hope this helps.

Rogan
Reply all
Reply to author
Forward
0 new messages