RxJava2 with parallel

240 views
Skip to first unread message

Max Grigoriev

unread,
Aug 2, 2018, 2:16:06 PM8/2/18
to vert.x
Hi,

How will the behavior of Flowable#parallel in Vertx?
Will it run sequentially on the event loop thread one by one or it will run pseudo-parallel when "longRequest" is waiting for the answer vertx will start another "longRequest" in "parallel" section?

.toFlowable(BackpressureStrategy.BUFFER)
.parallel(5)
.runOn(RxHelper.scheduler(vertx))
.flatMap(url -> longWebRequestToWebServer(url, ...))
.sequential()


I understand how it works in old thread system. We will create 5 threads and run each request parallel in the own thread. 
But what will be in the case when we run Flowable#parallel inside the verticle.

Thanks

Blake

unread,
Aug 2, 2018, 4:02:45 PM8/2/18
to vert.x
If this is blocking you should use it on the RxHelper.blockingScheduler. Otherwise, it's going to run it on the verticles context and block the event loop. As far as it being parallel, I do not think that it will actually be parallel (in the sense of actually running at the same time). Since you're on a verticle and you're using RxHelper.scheduler(vertx) it will be run on the context that your verticle is using -- therefore it's just going to queue the 5 parallel ops on the same event loop thread to be executed one after another. You could use the RxHelper.blockingScheduler and set it as NOT ordered (ordered=false) and it will truly run in parallel on all of your blocking threads.

Max Grigoriev

unread,
Aug 2, 2018, 4:27:08 PM8/2/18
to vert.x
No, it's not blocking. it makes HTTP call via WebClient.create(Main.vertx) but it looks like the next request doesn't run until the first one finishes.
I thought Vertx should run next request when the first one is waiting for the answer from the server.

Blake

unread,
Aug 2, 2018, 5:12:33 PM8/2/18
to vert.x
I do not believe it should be blocking till the request returns in this case. If your longWebRequest blocks it will block. Not sure on your implementation, but this should shed some light:

Note that computation scheduler is RxHelper.blockingScheduler(vertx,false)
        RxJavaPlugins.setIoSchedulerHandler {RxHelper.blockingScheduler(vertx, false)}
        RxJavaPlugins.setComputationSchedulerHandler { RxHelper.blockingScheduler(vertx, false) }
        RxJavaPlugins.setSingleSchedulerHandler { RxHelper.scheduler(vertx) }
        Flowable.just(1, 2, 3, 4, 5)
            .parallel(5)
            .runOn(Schedulers.computation())
            .flatMap { it ->
                println("Sending for $it")
                Thread.sleep(500)
                Flowable.just(it)
            }.sequential()
            .subscribe({ resp ->
                           println("Got ${resp}")
                       },
                                 {err -> println(err)})

produces:
Sending for 1
Sending for 2
Sending for 3
Sending for 4
Sending for 5
Got 2
Got 1
Got 3
Got 4
Got 5

This, however (note using single scheduler here. i.e., RxHelper.scheduler(vertx)):

        Flowable.just(1, 2, 3, 4, 5)
.parallel(5)
.runOn(Schedulers.single())
.flatMap { it ->
println("Sending for $it")
Thread.sleep(500)
Flowable.just(it)
}.sequential()
.subscribe({ resp ->
println("Got ${resp}")
},
{err -> println(err)})

produces:
Sending for 1
Got 1
Sending for 2
Got 2
Sending for 3
Got 3
Sending for 4
Got 4
Sending for 5
Got 5

And, if you were making a non blocking request in there (note still using Scheduler.single here):
Flowable.just(1, 2, 3, 4, 5)
.parallel(5)
.runOn(Schedulers.single())
.flatMap { it ->
println("Sending for $it")
Flowable.create<Int>({subscriber ->
vertx.setTimer((Math.random() * it * 100).toLong(), { timerId ->
subscriber.onNext(it)
subscriber.onComplete()
})}, BackpressureStrategy.BUFFER)
}.sequential()
.subscribe({ resp ->
println("Got ${resp}")
},
{err -> println(err)})

produces:
Sending for 1
Sending for 2
Sending for 3
Sending for 4
Sending for 5
Got 1
Got 2
Got 3
Got 4
Got 5

Max Grigoriev

unread,
Aug 2, 2018, 8:36:28 PM8/2/18
to vert.x
You use blocking threads but I don't.
For example. I have a couple of Verticles and send two messages to start processing.
Each verticle should send 10 requests to slow server (5 sec latency). So I expect the total processing time will be 5 sec because each verticle runs on own event loop + 10 parallel flowable should run all requests at the same time.

But in reality, I receive 20 sec total processing time. And it doesn't depend on if I make "parallel" or use "observeOn" or make nothing just regular flatMap.
And if I send 5 messages to Verticle then total processing time becomes 45 sec :(

Log output

2 messages:

2018-08-03 03:34:07 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 2
2018-08-03 03:34:07 [vert.x-eventloop-thread-2] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 1
2018-08-03 03:34:28 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 21094
2018-08-03 03:34:28 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 21100



5 messages:

2018-08-03 03:30:51 [vert.x-eventloop-thread-8] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 3
2018-08-03 03:30:51 [vert.x-eventloop-thread-0] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 4
2018-08-03 03:30:51 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 1
2018-08-03 03:30:51 [vert.x-eventloop-thread-1] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 2
2018-08-03 03:30:51 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 5
2018-08-03 03:31:27 [vert.x-eventloop-thread-3] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 36485
2018-08-03 03:31:32 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 3, time to process: 41554
2018-08-03 03:31:32 [vert.x-eventloop-thread-2] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 4, time to process: 41554
2018-08-03 03:31:37 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 5, time to process: 46624
2018-08-03 03:31:37 [vert.x-eventloop-thread-6] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 46639



Here's my code example:

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.RxHelper;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;

@Slf4j
public class MainT {
   
public final static Vertx vertx = Vertx.vertx();
    public final static WebClient webClient = WebClient.create(Main.vertx);

    public static void main(String[] args) {
       
TimeZone.setDefault(TimeZone.getTimeZone("GMT"));

        RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
        RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
        RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));

        vertx.deployVerticle(UrlVerticle.class.getName(), new DeploymentOptions().setInstances(10));

        vertx.setTimer(5000, id -> {
           
vertx.eventBus().send("url-test", new JsonObject().put("prefix", 1));
            vertx.eventBus().send("url-test", new JsonObject().put("prefix", 2));
        });
    }

   
public final static class UrlVerticle extends AbstractVerticle {
       
@Override
        public void start() throws Exception {
           
super.start();
            vertx.eventBus().<JsonObject>consumer("url-test")
                   
.toFlowable()
                   
.flatMap(this::listen)
                   
.subscribe(
                           
res -> log.debug("Finish: {}", res),
                            err -> log.error("Can not make import", err),
                            () -> log.info("Complete !!!")
                   
);
        }

       
private Flowable<String> listen(Message<JsonObject> msg) {
           
final Integer prefix = msg.body().getInteger("prefix", 0);
            String slowUrl = "/delay/5000/url/http://www.google.com/search?q=" + prefix;
            List<String> urls = new ArrayList<>();
            for (int i = 1; i < 10; i++) {
               
urls.add(slowUrl + i);
            }
           
log.info("Received prefix {}", prefix);
            long start = System.currentTimeMillis();
            return Observable.fromIterable(urls)
                   
.toFlowable(BackpressureStrategy.BUFFER)
                   
.parallel(10)
                   
.runOn(Schedulers.computation())
                   
.flatMap(uri -> webClient.get(80, "slowwly.robertomurray.co.uk", uri)
                           
.rxSend()
                           
.toFlowable()
                           
.map(HttpResponse::body)
                           
.map(Buffer::toString)
                   
)
                   
.sequential()
                   
.reduce(0, (count, body) -> {
                        return count + 1;
                    })
                   
.map(String::valueOf)
                   
.toFlowable()
                   
.doOnNext(res -> log.info("End prefix {}, time to process: {}", prefix, (System.currentTimeMillis() - start)));

        }
   
}
}

Max Grigoriev

unread,
Aug 2, 2018, 8:39:34 PM8/2/18
to vert.x
The code looks broken in the message:(
        public void start() throws Exception {<span style="color: #000;" class="s

Blake

unread,
Aug 2, 2018, 9:27:41 PM8/2/18
to vert.x
I was just giving you an example of what to expect when using different methods. One of the examples was on the event loop (NON blocking).

I imagine this might have something to do with your endpoint? Could be throttling the connections. As far as I can tell it should execute all of that immediately and then just wait for the resposnes. Try to create a local http server (using vertx and a vertx.setTimer on the response) and see if it's still the case.

Max Grigoriev

unread,
Aug 3, 2018, 5:38:30 AM8/3/18
to vert.x
I added web server on the example and also create the old plain thread application:


And it takes 20 sec to complete 2 message by 10 requests:

2018-08-03 12:32:15 [vert.x-eventloop-thread-1] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Service is started
2018-08-03 12:32:20 [vert.x-eventloop-thread-7] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 2
2018-08-03 12:32:20 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 1
2018-08-03 12:32:35 [vert.x-eventloop-thread-3] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 15310
2018-08-03 12:32:40 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 20317




And thread example: https://pastebin.com/33FGs6VV

And it runs 5 sec as expected:

2018-08-03 12:36:54 [main] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Main start
2018-08-03 12:36:54 [Thread-4] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Start 0
2018-08-03 12:36:54 [Thread-5] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Start 1
2018-08-03 12:36:59 [RxCachedThreadScheduler-3] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - End 1
2018-08-03 12:36:59 [RxCachedThreadScheduler-4] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - End 0
2018-08-03 12:36:59 [main] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Main end, 5277



Why so huge difference 20 sec vs 5 sec?

Max Grigoriev

unread,
Aug 3, 2018, 9:43:29 AM8/3/18
to vert.x
The problem was in Vertx WebClient:

 * The default maximum number of HTTP/1 connections a client will pool = 5
 */
public static final int DEFAULT_MAX_POOL_SIZE = 5
;

That's why I've got such bad performance :(

Blake

unread,
Aug 3, 2018, 9:46:22 AM8/3/18
to vert.x
Ah wow. Was just getting back to this. Glad you figured it out. It's always really annoying when something small like that impedes your work..!
Reply all
Reply to author
Forward
0 new messages