Similar to this question:
https://groups.google.com/forum/#!topic/akka-user/phtZM_kuy7o
I'm writing instrumentation for an akka-http for Datadog apm. I'm attempting to instrument the `bindAndHandle` api, which requires modifying the user-supplied akka-streams Flow to inject timing information.
There are four places required to run the instrumentation logic.
1. Before the user request handler runs
2. After the user request handler finishes
3. After the users response handler finishes
4. During any exceptions thrown out of the user handlers (for error capturing and cleanup)
I've been able to us the `join` api to set up custom graph to handle the instrumentation logic. I can run my instrumentation code for steps 1, 3, and 4, but I can't find any hooks for step 2 (after the user's request handler has run). Is there any akka-streams api I could leverage to accomplish that?
Here's what I have so far:
// akka.stream.impl.fusing.GraphInterpreter
// Instrumented bytecode produces this logic for bindAndHandle (implementation omitted for brevity)
def bindAndHandle(handler: Flow[HttpRequest, HttpResponse, Any], ...)(implicit fm: Materializer): Future[ServerBinding] = {
handler = handler.join(new DatadogGraph())
// akka http impl
}
public static class DatadogGraph
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
private final Inlet<HttpResponse> in1 = Inlet.create("datadog.in1");
private final Outlet<HttpResponse> out1 = Outlet.create("datadog.toWrapped");
private final Inlet<HttpRequest> in2 = Inlet.create("datadog.fromWrapped");
private final Outlet<HttpRequest> out2 = Outlet.create("datadog.out2");
private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
BidiShape.of(in1, out1, in2, out2);
@Override
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new DatadogLogic(shape());
}
}
/** Stateful logic of the akka http pipeline */
public static class DatadogLogic extends GraphStageLogic {
public DatadogLogic(
final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape) {
super(shape);
setHandler(
shape.in2(),
new AbstractInHandler() {
@Override
public void onPush() {
// 1. Runs before user sees request
push(shape.out2(), request);
}
@Override
public void onUpstreamFailure(Throwable ex) throws Exception {
// 4. notice error
super.onUpstreamFailure(ex);
}
});
setHandler(
shape.out2(),
new AbstractOutHandler() {
@Override
public void onPull() {
pull(shape.in2());
}
@Override
public void onDownstreamFinish() throws Exception {
// Invoked on errors. Don't complete this stage to allow error-capturing
}
});
setHandler(
shape.in1(),
new AbstractInHandler() {
@Override
public void onPush() {
// 3. run after user processes response
push(shape.out1(), response);
}
@Override
public void onUpstreamFailure(Throwable ex) throws Exception {
// 4. notice error
super.onUpstreamFailure(ex);
}
});
setHandler(
shape.out1(),
new AbstractOutHandler() {
@Override
public void onPull() {
pull(shape.in1());
}
});
}
}