package com.wolterskluwer.blobs.handlers;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Handler;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.Pump;
import io.vertx.ext.web.RoutingContext;
public class ProxyHandler implements Handler<RoutingContext> {
private HttpClient httpClient;
public ProxyHandler(AbstractVerticle ctx) {
this.httpClient = ctx.getVertx().createHttpClient();
}
@Override
public void handle(RoutingContext routingContext) {
HttpServerRequest request = routingContext.request();
// you want to pause until you are connected to proxy
request.pause();
HttpServerResponse response = routingContext.response();
response.setChunked(true);
Handler<Throwable> basicRequestHandler = t -> {
try {
if (!response.closed()) {
if (!response.headWritten()) {
response.setStatusCode(500).end();
} else if (!response.ended()) {
response.end();
}
}
} finally {
// make sure request is not left as paused
request.resume();
}
};
request.exceptionHandler(basicRequestHandler);
// Note: to complete reading request handling switch to proxyRequest.sendHead
// and get back here for response
HttpClientRequest proxyRequest = httpClient.request(request.method(), "proxy_host",
routingContext.request().path(), proxyResponse -> {
response.setStatusCode(proxyResponse.statusCode());
response.setStatusMessage(proxyResponse.statusMessage());
// start the pump
proxyResponse.endHandler(Void -> {
response.end();
});
Pump responsePump = Pump.pump(proxyResponse, response).start();
Handler<Throwable> responseHandler = t -> {
responsePump.stop();
// try to end response
if (!response.closed()) {
if (!response.headWritten()) {
response.setStatusCode(500).end();
} else if (!response.ended()) {
response.end();
}
}
// question: how to discard proxyResponse??
// set empty handler to simply consume the payload...
proxyResponse.handler(b -> {
});
proxyResponse.endHandler(Void -> {
});
// make sure it is not left paused
proxyResponse.resume();
};
response.exceptionHandler(responseHandler);
proxyResponse.exceptionHandler(responseHandler);
});
// before we send anything to proxy make sure we set handler
proxyRequest.exceptionHandler(basicRequestHandler);
proxyRequest.sendHead(version -> {
// now we are connected to proxy
// we can resume request
request.resume();
// and start pump
request.endHandler(Void -> {
proxyRequest.end();
});
Pump requestPump = Pump.pump(request, proxyRequest).start();
// but lets modify request exception handler to also close the pump and proxy
Handler<Throwable> proxyConnectedHandler = t -> {
try {
requestPump.stop();
proxyRequest.end(); // I guess we may endup sending to the
// proxy something incomplete
} finally {
basicRequestHandler.handle(t);
}
};
request.exceptionHandler(proxyConnectedHandler);
proxyRequest.exceptionHandler(proxyConnectedHandler);
});
}
}