Wrapping a Flow

42 views
Skip to first unread message

andre...@datadoghq.com

unread,
Jun 4, 2018, 4:32:45 PM6/4/18
to Akka User List
Similar to this question: https://groups.google.com/forum/#!topic/akka-user/phtZM_kuy7o

I'm writing instrumentation for an akka-http for Datadog apm. I'm attempting to instrument the `bindAndHandle` api, which requires modifying the user-supplied akka-streams Flow to inject timing information.

There are four places required to run the instrumentation logic.

1. Before the user request handler runs
2. After the user request handler finishes
3. After the users response handler finishes
4. During any exceptions thrown out of the user handlers (for error capturing and cleanup)

I've been able to us the `join` api to set up custom graph to handle the instrumentation logic. I can run my instrumentation code for steps 1, 3, and 4, but I can't find any hooks for step 2 (after the user's request handler has run). Is there any akka-streams api I could leverage to accomplish that?

Here's what I have so far:
  // akka.stream.impl.fusing.GraphInterpreter
  // Instrumented bytecode produces this logic for bindAndHandle (implementation omitted for brevity)
  def bindAndHandle(handler:   Flow[HttpRequest, HttpResponse, Any], ...)(implicit fm: Materializer): Future[ServerBinding] = {
    handler = handler.join(new DatadogGraph())
    // akka http impl
  }

  public static class DatadogGraph
      extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
    private final Inlet<HttpResponse> in1 = Inlet.create("datadog.in1");
    private final Outlet<HttpResponse> out1 = Outlet.create("datadog.toWrapped");
    private final Inlet<HttpRequest> in2 = Inlet.create("datadog.fromWrapped");
    private final Outlet<HttpRequest> out2 = Outlet.create("datadog.out2");
    private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
        BidiShape.of(in1, out1, in2, out2);

    @Override
    public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
      return shape;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) {
      return new DatadogLogic(shape());
    }
  }

  /** Stateful logic of the akka http pipeline */
  public static class DatadogLogic extends GraphStageLogic {
    public DatadogLogic(
        final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape) {
      super(shape);

      setHandler(
          shape.in2(),
          new AbstractInHandler() {
            @Override
            public void onPush() {
              // 1. Runs before user sees request
              push(shape.out2(), request);
            }

            @Override
            public void onUpstreamFailure(Throwable ex) throws Exception {
              // 4. notice error
              super.onUpstreamFailure(ex);
            }
          });

      setHandler(
          shape.out2(),
          new AbstractOutHandler() {
            @Override
            public void onPull() {
              pull(shape.in2());
            }

            @Override
            public void onDownstreamFinish() throws Exception {
              // Invoked on errors. Don't complete this stage to allow error-capturing
            }
          });

      setHandler(
          shape.in1(),
          new AbstractInHandler() {
            @Override
            public void onPush() {
              // 3. run after user processes response
              push(shape.out1(), response);
            }

            @Override
            public void onUpstreamFailure(Throwable ex) throws Exception {
              // 4. notice error
              super.onUpstreamFailure(ex);
            }
          });

      setHandler(
          shape.out1(),
          new AbstractOutHandler() {
            @Override
            public void onPull() {
              pull(shape.in1());
            }
          });
    }
  }



Reply all
Reply to author
Forward
0 new messages