Hi Johannes,
First I would recommend you to upgrade to the latest Akka Stream, 2.4.3 if possible, it contains many fixes and improvements over the version you seem to be using (OutputStreamSink is a private, internal API nowadays).
The corresponding method in the newer versions is StreamConverters.fromOutputStream(creator) and it materializes into a CompletionStage[IOResult] which completes when the OutputStream is closed. To get the materialized value out of the stream you would do source.
Secondly it will not be safe to use Piped Input/OutputStream as they require that usage is bound to one specific thread each, which the StreamConverters will not guarantee.
This is how you would do something like what you are asking for with Akka 2.4.3:
ActorSystem system = ActorSystem.create();
Materializer mat = ActorMaterializer.create(system);
CompletionStage<HttpResponse> responseFuture =
Http.get(system).singleRequest(HttpRequest.create("http://example.com"), mat);
CompletionStage<IOResult> done = responseFuture.thenCompose(response -> {
Source<ByteString, Object> source = response.entity().getDataBytes();
// note that it is not safe/correct to create the outputstream outside of the
// lambda/creator given to fromOutputStream
Sink<ByteString, CompletionStage<IOResult>> sink =
StreamConverters.fromOutputStream(HttpClientExample::someApiReturningANewOutputStream);
// just to make the type clear, ofc you can just return it
CompletionStage<IOResult> completionStage = source.toMat(sink, Keep.right()).run(mat);
return completionStage;
});
done.thenAccept((result) -> {
if (result.wasSuccessful())
System.out.println("Done, wrote " + result.getCount() + " bytes");
else
System.out.println("Failed: " + result.getError().getMessage());
});
I hope this helps
--
Johan AndrénAkka Team, Lightbend Inc.