Hi,
I have a case where I want to read from one database and write to another.
Currently I'm doing this in the following way:
Broadcaster<Object> sink = Streams.broadcast(Environment.get());
sink.dispatchOn(Environment.getCachedDispatcher()).buffer(1024).consume(list -> doBatchedWrite(list));
while (another item available) sink.doNext(readItem());
Watching this in the thread view of my profiler looks like something like this (where R is blocking read with a specified fetch size and W is blocking batch write).
---|RRR|---|RRR|---
-------|WWW|---|WWW
I presume this is because the buffer (capacity) is telling the publisher not to read any more before the write has finished and the consumer has more capacity?
I suppose I'm using buffer as a convenience to batch up a list of items ready for the writer. However, I really want reading to continue whilst writing is happening but in some sort of bounded/controllable way so I'd end up with a situation more like:
---|RRR|-|RRR|-|RRR|-|RRR|-
-------|WWW|-|WWW|-|WWW|-|W
Should I be using something like onOverflowBuffer() or window() to allow reading to continue on the read stream/thread or am I completely mis-using reactor and what I should actually be doing is unbounded streaming in reactor and doing the batching/collecting inside the consumer?
Any advice gratefully received.
Jon