A simplified example of what this might look like:
public class SomeController extends Controller {
public CompletionStage<Result> someAction() {
return someService.doHairyStuff()
.thenApplyAsync(dataStream -> ok().chunked(dataStream), httpExecutionContext);
}
}
public class SomeService {
public SomeReactiveType<SomeByteBuff> doHairyStuff() {
return someRepository.fetchDataFromDb()
.thenCompose(dataStream -> makeHttpCallToExternalService(dataStream), someExecutionContext)
.thenComponse(dataStream -> fetchSomethingFromKafka(dataStream), anotherExecutionContext)
.thenCompose(dataStream -> composeAndTransformData(dataStream), oneMoreExecutionContext);
}
}
public class SomeRepository() {
public class Cursor<SomeEntity, ?> fetchDataFromDb() {
return ComplatableFuture.supplyAsync(() -> mybatisMapper::selectBillionsOfRows, databaseExecutionContext);
}
}
Chains of CompletionStages may be long and in every called method may do more and more calls that add data to dataStream or transform it somehow. CompletionStage that returns from someAction is form HTTP response by web framework and the first part of this response consumed by client long before cursor returned last entity. And all the time session must stay opened and the cursor must produce data, row by row, as the client reads. The idea is to form a very large HTTP response without wasting more memory than on one entity. Of course, the data stream may be closed at any point of execution, including in the framework code. Accordingly, the session should be closed at the same moment.
суббота, 3 июля 2021 г. в 12:54:11 UTC+3, Guy Rouillier: