There are a couple different ways to accomplish what you’re wanting to do. If you’re wanting to create threads dedicated to this process, you can use the new RingBufferWorkProcessor in Reactor 2.0. [1] To use it like a worker pool, you’d do something like this:
// Set the queue size to 256 items max
RingBufferWorkProcessor<UpdateRequest> processor = RingBufferWorkProcessor.share("mysql-updater", 256);
// Create a Stream and add consumers equal to # of CPU cores
Stream<UpdateRequest> s = Streams.wrap(processor);
for(int i=0; i<Environment.PROCESSORS; i++){
s.consume(req -> performUpdateInRingBufferThread(req));
}
// Query file, create UpdateRequest, place on RingBuffer (queue)
Streams.from(readRequests(file))
.subscribe(processor);
This has the advantage of being very explicit in code and provides a dedicated thread pool underneath to handle the update requests to mySQL.
There are also other ways to go about doing this that don’t require creating special threads. You could use a plain Stream as well:
Streams.from(readRequests(file))
.capacity(256) // Set the queue size to 256 items max
.consume(req -> {
Environment.cachedDispatcher()
.execute(() -> performUpdateInRingBufferThread(req));
});
This has the advantage that you don’t have to create dedicated threads that are responsible for communicating to mySQL and is more succinct.