The Connection.close() issue is a complicated one dataflow-vise. Normally, with 1 Connection -> 1 Statement -> 1 Result -> N Rows flow, when all rows have been consumed, you'd append the close call before completing the end-subscriber:
createConnection()
.flatMap(connection ->
connection.createStatement("SELECT 1 FROM DUAL")
.execute()
.map((row, rm) -> Record...)
.concatWith(connection.close().cast(Record.class))
)
.subscribe(record -> ...);
However, if we add take(1) before subscribe(record -> ...), that would cancel the chain and connection.close() would not execute. Since there is no confirmation for a Subscription.cancel call, the simplest you can do is fire-and-forget close() upon cancellation:
createConnection()
.flatMap(connection ->
connection.createStatement("SELECT 1 FROM DUAL")
.execute()
.map((row, rm) -> Record...)
.concatWith(connection.close().cast(Record.class))
.doOnCancel(() -> connection.close().subscribe()) // <-------------
)
.take(1)
.subscribe(record -> ...);
You could add some action queue indeed to serialize out retrieving rows and closing the connection so that in-flight rows do not get interrupted upon async cancellation. The trouble is, how would a timeout work then; i.e., waiting for a row that would not come in time but then can't cancel/close the connection until the row arrives?
A similar problem arises with C# IAsyncEnumerable where
DisposeAsnc() can't be called until
MoveNextAsync returned, hence they needed to inject a
CancellationToken to be able to interrupt the whole thing while
MoveNextAsync was running (which then throws an exception).