Netty and an upstream SOCKS proxy

608 views
Skip to first unread message

Rogan Dawes

unread,
Nov 18, 2015, 5:43:15 AM11/18/15
to Netty discussions
Hi folks,

I'm trying to implement a SOCKS Proxy that allows you to intercept and manipulate arbitrary TCP protocols. I have the basics working, in that I can accept a connection, decode the SOCKS request, if appropriate perform SSL decode, and then open a direct socket connection to the requested end point.

What I would like to support, though, is the ability to specify another SOCKS server for upstream connections to be routed through.

I have tried looking at the example code, but saw nothing for SOCKS clients. I managed to find the test cases, which kind of gave an idea of how to use Socks5ProxyHandler. However, I seem to be running into difficulty combining this example with the SOCKS server's Promise<Channel> that notifies the SOCKS server's client that the connection has been made.

Is there an example that shows how to actually use the Socks5Handler class correctly?

My code looks like (Mostly copied from the example SocksServerConnect Handler, and heavily commented for my own understanding): 


@ChannelHandler.Sharable


public final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksMessage> {


       public static final AttributeKey<String> DESTINATION_HOST = AttributeKey.valueOf("destination_host");

       public static final AttributeKey<Integer> DESTINATION_PORT = AttributeKey.valueOf("destination_port");

               private final Bootstrap b = new Bootstrap();


        private final DefaultSocks4CommandResponse socks4Success = new DefaultSocks4CommandResponse(

                       Socks4CommandStatus.SUCCESS);


        private final DefaultSocks4CommandResponse socks4Failure = new DefaultSocks4CommandResponse(

                       Socks4CommandStatus.REJECTED_OR_FAILED);

       private final InetSocketAddress upstreamProxyAddress = new InetSocketAddress("127.0.0.1", 1081);


       
@Override

       public void messageReceived(final ChannelHandlerContext ctx, final SocksMessage message) throws Exception {


                final AbstractSocksMessage success, failure;

               Attribute<String> dst = ctx.attr(DESTINATION_HOST);

               Attribute<Integer> port = ctx.attr(DESTINATION_PORT);


                if (message instanceof Socks4CommandRequest) {

                       Socks4CommandRequest request = (Socks4CommandRequest) message;

                       success = socks4Success;

                       failure = socks4Failure;

                       dst.set(request.dstAddr());

                       port.set(request.dstPort());

               } else if (message instanceof Socks5CommandRequest) {

                       Socks5CommandRequest request = (Socks5CommandRequest) message;

                       success = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, request.dstAddrType());

                       failure = new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, request.dstAddrType());

                       dst.set(request.dstAddr());

                       port.set(request.dstPort());

               } else {

                       ctx.close();

                       return;

               }



                final Promise<Channel> promise = ctx.executor().newPromise();


                // set up a method to notify the inbound channel when the outbound

               // channel is connected. When the promise is kept, the outbound

               // channel is either ready, or in error. Depending on which, tell

               // the inbound channel what has happened.

                promise.addListener(new SocksConnectionResponseSender(ctx, success, failure));


                                // Now set up the client event loop

                final Channel inboundChannel = ctx.channel();

               b.group(inboundChannel.eventLoop()).channel(NioSocketChannel.class)

                               .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).option(ChannelOption.SO_KEEPALIVE, true)

                               .handler(new UpstreamChannelInitializer(promise, upstreamProxyAddress));


                b.connect(dst.get(), port.get()).addListener(new ChannelFutureListener() {

                       @Override

                       public void operationComplete(ChannelFuture future) throws Exception {

                               if (future.isSuccess()) {

                                       // Connection established, the handler will

                                       // send a success message using the "promise"

                                } else {

                                       // Close the connection if the connection

                                       // attempt has failed.

                                        ctx.channel().writeAndFlush(failure).addListener(ChannelFutureListener.CLOSE);

                               }

                       }

               });

       }


        @Override

       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

               ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

       }


        private static class SocksConnectionResponseSender implements FutureListener<Channel> {


                private ChannelHandlerContext ctx;

                private AbstractSocksMessage success, failure;


                public SocksConnectionResponseSender(ChannelHandlerContext ctx, AbstractSocksMessage success,

                               AbstractSocksMessage failure) {

                       this.ctx = ctx;

                       this.success = success;

                       this.failure = failure;

               }


                @Override

               public void operationComplete(final Future<Channel> future) throws Exception {

                       final Channel outboundChannel = future.getNow();

                       // if the outbound channel was successfully established

                       // send a socks SUCCESS message to the client

                        if (future.isSuccess()) {

                               ChannelFuture responseFuture = ctx.channel().writeAndFlush(success);

                               // once the success message has been delivered, remove

                               // this handler,

                               // and set up a Relay Handler on each channel to copy

                               // data back and forth

                                responseFuture.addListener(new ChannelFutureListener() {

                                       @Override

                                       public void operationComplete(ChannelFuture channelFuture) {

                                               ctx.pipeline().remove(SocksServerConnectHandler.class);

                                               InterceptHandler.INSTANCE.addChannels(ctx.channel(), outboundChannel);

                                                outboundChannel.pipeline().addLast("intercepthandler", InterceptHandler.INSTANCE);

                                               ctx.pipeline().addLast("intercepthandler", InterceptHandler.INSTANCE);

                                       }

                               });

                       } else {

                               // send a failure message, and close the connection

                                ctx.channel().writeAndFlush(failure).addListener(ChannelFutureListener.CLOSE);

                       }

               }

       }


        private static class UpstreamChannelInitializer extends ChannelInitializer<SocketChannel> {


                private Promise<Channel> promise;

               private InetSocketAddress upstream;


                public UpstreamChannelInitializer(Promise<Channel> promise, InetSocketAddress upstream) {

                       this.promise = promise;

                       this.upstream = upstream;

               }


                @Override

               protected void initChannel(SocketChannel ch) throws Exception {

                       ChannelPipeline p = ch.pipeline();
                     
//Code works fine as a direct connection, but fails if
                     
//we add this below

                        p.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));

                }


                @Override

                public void channelActive(ChannelHandlerContext ctx) {

                       ctx.pipeline().remove(this);

                       promise.setSuccess(ctx.channel());

               }

               @Override

               public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {

                       promise.setFailure(throwable);

               }

       }

}



Rogan Dawes

unread,
Nov 19, 2015, 4:11:31 AM11/19/15
to Netty discussions
So I figured out what my problem was (well, at least, I was able to get it to work, which is not entirely the same thing!):

I changed:

 p.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));


to 

 p.addFirst("socks5proxyhandler", new Socks5ProxyHandler(upstream));


Which worked.

So, this leads to my next question, which is:

What is the recommended way of passing configuration information into the handlers?

It seems to me that you don't really want each handler to have to pass configuration information to subsequent handlers, because that gets messy pretty quickly.

For example, I'd like my UpstreamChannelInitializer class to be able to make connections to either an HTTP(s) CONNECT proxy, or a SOCKS (4, 4a, 5) proxy, or make a direct connection if no proxy is configured. What is the best way to pass that sort of configuration data to a ChannelInitializer/Handler?

Thanks!

Rogan

Norman Maurer

unread,
Nov 19, 2015, 10:42:25 AM11/19/15
to ne...@googlegroups.com
It really depends on how you want to handle it.

You could use the Channel.attr(…) and store it there. or use ChannelHandlerContext.fireUserEventTriggered(…) to pass stuff through the pipeline.

       private final InetSocketAddress upstreamProxyAddress = newInetSocketAddress("127.0.0.1", 1081);


       
@Override

       public void messageReceived(final ChannelHandlerContext ctx, final SocksMessagemessage) throws Exception {

                public SocksConnectionResponseSender(ChannelHandlerContext ctx,AbstractSocksMessage success,

        private static class UpstreamChannelInitializer extendsChannelInitializer<SocketChannel> {


                private Promise<Channel> promise;

               private InetSocketAddress upstream;


                public UpstreamChannelInitializer(Promise<Channel> promise, InetSocketAddressupstream) {

                       this.promise = promise;

                       this.upstream = upstream;

               }


                @Override

               protected void initChannel(SocketChannel ch) throws Exception {

                       ChannelPipeline p = ch.pipeline();
                      
//Code works fine as a direct connection, but fails if
                     
//we add this below

                        p.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));

                }


                @Override

                public void channelActive(ChannelHandlerContext ctx) {

                       ctx.pipeline().remove(this);

                       promise.setSuccess(ctx.channel());

               }

               @Override

               public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {

                       promise.setFailure(throwable);

               }

       }

}




-- 
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/0ed248ff-f866-4308-a2a2-796fada815b8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Rogan Dawes

unread,
Nov 19, 2015, 10:50:16 AM11/19/15
to ne...@googlegroups.com

How would storing it in the channel attr work? Should I configure the initial channel handler to set the attribute?

I'm still trying to wrap my head around the Netty "approach", so please forgive all the questions.

Rogan

Reply all
Reply to author
Forward
0 new messages