How to listen to one streaming source and publish to multiple websocket clients

24 views
Skip to first unread message

u6f6o

unread,
Apr 20, 2017, 1:51:10 PM4/20/17
to Netty discussions
Hi, 

I am currently working on a little trade date streaming service that basically listens to a service that streams finance tick data and after applying some business logic, I'd like to publish this information on all websocket clients that are connected to my server. 

So far I added a Bootstrap that listens to the streaming service like this: 

public class TickSource {

    private static final byte FS = 28;

    public void run() throws Exception {
        String host = "localhost";
        int port = 20077;

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(81920, Unpooled.wrappedBuffer(new byte[]{FS})));
                    ch.pipeline().addLast(new TickSourceHandler());
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

and another ServerBootstrap that publishes the information to the connected clients: 

public class TickDistributor {

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            final ServerBootstrap sb = new ServerBootstrap();
            sb.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(final SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new HttpRequestDecoder(),
                                    new HttpObjectAggregator(65536),
                                    new HttpResponseEncoder(),
                                    new WebSocketServerProtocolHandler("/ticks"),
                                    new TickDistributorHandler()
                            );
                        }
                    }
            );
            Channel ch = sb.bind(8080).sync().channel();
            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

So, my problem is that I do not really know how to connect these two Bootstraps with each other? Having read Netty in Action, I am aware that you are able to bootstrap with the same Eventloop, which also uses the same thread for all operations but in my case I have a 1->n relation, so one producer and many consumers. 

I thought about storing the ServerChannels in some collection and whenever the Bootstrap client has a new message, forward it to the channels but I am not sure if that is a good approach or if it can be done in a fashion that suits netty better? 

I also read about local channels, which are especially for Bootstraps running on the same VM, not sure if it is a good fit though? 

Thx, 
u6f6o 
Reply all
Reply to author
Forward
0 new messages