@ChannelHandler.Sharable
public final class SocksServerConnectHandler extends SimpleChannelInboundHandler<SocksMessage> {
public static final AttributeKey<String> DESTINATION_HOST = AttributeKey.valueOf("destination_host");
public static final AttributeKey<Integer> DESTINATION_PORT = AttributeKey.valueOf("destination_port");
private final Bootstrap b = new Bootstrap();
private final DefaultSocks4CommandResponse socks4Success = new DefaultSocks4CommandResponse(
Socks4CommandStatus.SUCCESS);
private final DefaultSocks4CommandResponse socks4Failure = new DefaultSocks4CommandResponse(
Socks4CommandStatus.REJECTED_OR_FAILED);
private final InetSocketAddress upstreamProxyAddress = new InetSocketAddress("127.0.0.1", 1081);
@Override
public void messageReceived(final ChannelHandlerContext ctx, final SocksMessage message) throws Exception {
final AbstractSocksMessage success, failure;
Attribute<String> dst = ctx.attr(DESTINATION_HOST);
Attribute<Integer> port = ctx.attr(DESTINATION_PORT);
if (message instanceof Socks4CommandRequest) {
Socks4CommandRequest request = (Socks4CommandRequest) message;
success = socks4Success;
failure = socks4Failure;
dst.set(request.dstAddr());
port.set(request.dstPort());
} else if (message instanceof Socks5CommandRequest) {
Socks5CommandRequest request = (Socks5CommandRequest) message;
success = new DefaultSocks5CommandResponse(Socks5CommandStatus.SUCCESS, request.dstAddrType());
failure = new DefaultSocks5CommandResponse(Socks5CommandStatus.FAILURE, request.dstAddrType());
dst.set(request.dstAddr());
port.set(request.dstPort());
} else {
ctx.close();
return;
}
final Promise<Channel> promise = ctx.executor().newPromise();
// set up a method to notify the inbound channel when the outbound
// channel is connected. When the promise is kept, the outbound
// channel is either ready, or in error. Depending on which, tell
// the inbound channel what has happened.
promise.addListener(new SocksConnectionResponseSender(ctx, success, failure));
// Now set up the client event loop
final Channel inboundChannel = ctx.channel();
b.group(inboundChannel.eventLoop()).channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new UpstreamChannelInitializer(promise, upstreamProxyAddress));
b.connect(dst.get(), port.get()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection established, the handler will
// send a success message using the "promise"
} else {
// Close the connection if the connection
// attempt has failed.
ctx.channel().writeAndFlush(failure).addListener(ChannelFutureListener.CLOSE);
}
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
private static class SocksConnectionResponseSender implements FutureListener<Channel> {
private ChannelHandlerContext ctx;
private AbstractSocksMessage success, failure;
public SocksConnectionResponseSender(ChannelHandlerContext ctx, AbstractSocksMessage success,
AbstractSocksMessage failure) {
this.ctx = ctx;
this.success = success;
this.failure = failure;
}
@Override
public void operationComplete(final Future<Channel> future) throws Exception {
final Channel outboundChannel = future.getNow();
// if the outbound channel was successfully established
// send a socks SUCCESS message to the client
if (future.isSuccess()) {
ChannelFuture responseFuture = ctx.channel().writeAndFlush(success);
// once the success message has been delivered, remove
// this handler,
// and set up a Relay Handler on each channel to copy
// data back and forth
responseFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) {
ctx.pipeline().remove(SocksServerConnectHandler.class);
InterceptHandler.INSTANCE.addChannels(ctx.channel(), outboundChannel);
outboundChannel.pipeline().addLast("intercepthandler", InterceptHandler.INSTANCE);
ctx.pipeline().addLast("intercepthandler", InterceptHandler.INSTANCE);
}
});
} else {
// send a failure message, and close the connection
ctx.channel().writeAndFlush(failure).addListener(ChannelFutureListener.CLOSE);
}
}
}
private static class UpstreamChannelInitializer extends ChannelInitializer<SocketChannel> {
private Promise<Channel> promise;
private InetSocketAddress upstream;
public UpstreamChannelInitializer(Promise<Channel> promise, InetSocketAddress upstream) {
this.promise = promise;
this.upstream = upstream;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//Code works fine as a direct connection, but fails if
//we add this below
p.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
promise.setFailure(throwable);
}
}
}
p.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));
p.addFirst("socks5proxyhandler", new Socks5ProxyHandler(upstream));
private final InetSocketAddress upstreamProxyAddress = newInetSocketAddress("127.0.0.1", 1081);
@Overridepublic void messageReceived(final ChannelHandlerContext ctx, final SocksMessagemessage) throws Exception {
public SocksConnectionResponseSender(ChannelHandlerContext ctx,AbstractSocksMessage success,
private static class UpstreamChannelInitializer extendsChannelInitializer<SocketChannel> {
private Promise<Channel> promise;
private InetSocketAddress upstream;
public UpstreamChannelInitializer(Promise<Channel> promise, InetSocketAddressupstream) {
this.promise = promise;
this.upstream = upstream;
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
//Code works fine as a direct connection, but fails if
//we add this belowp.addLast("socks5proxyhandler", new Socks5ProxyHandler(upstream));
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.pipeline().remove(this);
promise.setSuccess(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
promise.setFailure(throwable);
}
}
}
--
You received this message because you are subscribed to the Google Groups "Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email to netty+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/netty/0ed248ff-f866-4308-a2a2-796fada815b8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
How would storing it in the channel attr work? Should I configure the initial channel handler to set the attribute?
I'm still trying to wrap my head around the Netty "approach", so please forgive all the questions.
Rogan