.toFlowable(BackpressureStrategy.BUFFER)
.parallel(5)
.runOn(RxHelper.scheduler(vertx))
.flatMap(url -> longWebRequestToWebServer(url, ...))
.sequential()
I understand how it works in old thread system. We will create 5 threads and run each request parallel in the own thread.
But what will be in the case when we run Flowable#parallel inside the verticle.
Thanks
RxJavaPlugins.setIoSchedulerHandler {RxHelper.blockingScheduler(vertx, false)}
RxJavaPlugins.setComputationSchedulerHandler { RxHelper.blockingScheduler(vertx, false) }
RxJavaPlugins.setSingleSchedulerHandler { RxHelper.scheduler(vertx) }
Flowable.just(1, 2, 3, 4, 5)
.parallel(5)
.runOn(Schedulers.computation())
.flatMap { it ->
println("Sending for $it")
Thread.sleep(500)
Flowable.just(it)
}.sequential()
.subscribe({ resp ->
println("Got ${resp}")
},
{err -> println(err)})
Flowable.just(1, 2, 3, 4, 5)
.parallel(5)
.runOn(Schedulers.single())
.flatMap { it ->
println("Sending for $it")
Thread.sleep(500)
Flowable.just(it)
}.sequential()
.subscribe({ resp ->
println("Got ${resp}")
},
{err -> println(err)})
Flowable.just(1, 2, 3, 4, 5)
.parallel(5)
.runOn(Schedulers.single())
.flatMap { it ->
println("Sending for $it")
Flowable.create<Int>({subscriber ->
vertx.setTimer((Math.random() * it * 100).toLong(), { timerId ->
subscriber.onNext(it)
subscriber.onComplete()
})}, BackpressureStrategy.BUFFER)
}.sequential()
.subscribe({ resp ->
println("Got ${resp}")
},
{err -> println(err)})
2018-08-03 03:34:07 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 22018-08-03 03:34:07 [vert.x-eventloop-thread-2] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 12018-08-03 03:34:28 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 210942018-08-03 03:34:28 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 211002018-08-03 03:30:51 [vert.x-eventloop-thread-8] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 32018-08-03 03:30:51 [vert.x-eventloop-thread-0] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 42018-08-03 03:30:51 [vert.x-eventloop-thread-5] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 12018-08-03 03:30:51 [vert.x-eventloop-thread-1] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 22018-08-03 03:30:51 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 52018-08-03 03:31:27 [vert.x-eventloop-thread-3] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 364852018-08-03 03:31:32 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 3, time to process: 415542018-08-03 03:31:32 [vert.x-eventloop-thread-2] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 4, time to process: 415542018-08-03 03:31:37 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 5, time to process: 466242018-08-03 03:31:37 [vert.x-eventloop-thread-6] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 46639import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.RxHelper;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.ext.web.client.HttpResponse;
import io.vertx.reactivex.ext.web.client.WebClient;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
@Slf4j
public class MainT {
public final static Vertx vertx = Vertx.vertx();
public final static WebClient webClient = WebClient.create(Main.vertx);
public static void main(String[] args) {
TimeZone.setDefault(TimeZone.getTimeZone("GMT"));
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
vertx.deployVerticle(UrlVerticle.class.getName(), new DeploymentOptions().setInstances(10));
vertx.setTimer(5000, id -> {
vertx.eventBus().send("url-test", new JsonObject().put("prefix", 1));
vertx.eventBus().send("url-test", new JsonObject().put("prefix", 2));
});
}
public final static class UrlVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
super.start();
vertx.eventBus().<JsonObject>consumer("url-test")
.toFlowable()
.flatMap(this::listen)
.subscribe(
res -> log.debug("Finish: {}", res),
err -> log.error("Can not make import", err),
() -> log.info("Complete !!!")
);
}
private Flowable<String> listen(Message<JsonObject> msg) {
final Integer prefix = msg.body().getInteger("prefix", 0);
String slowUrl = "/delay/5000/url/http://www.google.com/search?q=" + prefix;
List<String> urls = new ArrayList<>();
for (int i = 1; i < 10; i++) {
urls.add(slowUrl + i);
}
log.info("Received prefix {}", prefix);
long start = System.currentTimeMillis();
return Observable.fromIterable(urls)
.toFlowable(BackpressureStrategy.BUFFER)
.parallel(10)
.runOn(Schedulers.computation())
.flatMap(uri -> webClient.get(80, "slowwly.robertomurray.co.uk", uri)
.rxSend()
.toFlowable()
.map(HttpResponse::body)
.map(Buffer::toString)
)
.sequential()
.reduce(0, (count, body) -> {
return count + 1;
})
.map(String::valueOf)
.toFlowable()
.doOnNext(res -> log.info("End prefix {}, time to process: {}", prefix, (System.currentTimeMillis() - start)));
}
}
}
public void start() throws Exception {<span style="color: #000;" class="s
2018-08-03 12:32:15 [vert.x-eventloop-thread-1] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Service is started2018-08-03 12:32:20 [vert.x-eventloop-thread-7] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 22018-08-03 12:32:20 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - Received prefix 12018-08-03 12:32:35 [vert.x-eventloop-thread-3] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 2, time to process: 153102018-08-03 12:32:40 [vert.x-eventloop-thread-4] [com.upwork.workplace.loganalyzer.MainT]-[INFO] - End prefix 1, time to process: 20317
2018-08-03 12:36:54 [main] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Main start2018-08-03 12:36:54 [Thread-4] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Start 02018-08-03 12:36:54 [Thread-5] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Start 12018-08-03 12:36:59 [RxCachedThreadScheduler-3] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - End 1
2018-08-03 12:36:59 [RxCachedThreadScheduler-4] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - End 02018-08-03 12:36:59 [main] [com.upwork.workplace.loganalyzer.MainS]-[INFO] - Main end, 5277
* The default maximum number of HTTP/1 connections a client will pool = 5
*/
public static final int DEFAULT_MAX_POOL_SIZE = 5;