rxfied execute blocking with customer workerexecutor

225 views
Skip to first unread message

Asher Tarnopolski

unread,
Jun 27, 2017, 4:46:28 AM6/27/17
to vert.x
good day y'all,

core vertx provides a way to execute blocking using a custome workerexecutor, something like this:

WorkerExecutor we = vertx.createSharedWorkerExecutor("worker1");
we
.executeBlocking(blockingCodeHandler, ordered, resultHandler);

i need you help with the ideas on how the same thing can be done in rxfied way, considering i have the following method:

public <T> Single<T> executeBlocking(Observable<T> blockingObservable, boolean ordered) {
   
return currentContext().rxExecuteBlocking(f -> {
       
blockingObservable.subscribeOn(RxHelper.blockingScheduler(vertx)).subscribe(f::complete, f::fail);
   
}, ordered);
}

thanks!
  

Asher Tarnopolski

unread,
Jun 27, 2017, 5:25:21 AM6/27/17
to vert.x
ok, a moment after i posted i've seen a way to pass a custom workerexecutor to a context scheduler. does the implementation below make sense?  

protected <T> Single<T> executeBlocking(Observable<T> blockingObservable, boolean ordered) {
   
return currentContext().rxExecuteBlocking(f -> {

       
WorkerExecutor we = vertx.createSharedWorkerExecutor("worker1");

       
ContextScheduler scheduler = new ContextScheduler(we, ordered);
       
blockingObservable.subscribeOn(scheduler).subscribe(
               
next -> {
                   
we.close();
                   
f.complete(next);
               
}, f::fail);
   
}, ordered);
}


Thomas SEGISMONT

unread,
Jun 27, 2017, 9:39:22 AM6/27/17
to ve...@googlegroups.com
Hi,

I'm not sure I understand the details of the second snippet, but the rxified equivalent to the first one is:

WorkerExecutor we = vertx.createSharedWorkerExecutor("worker1");
Single<X> single = we.rxExecuteBlocking(blockingCodeHandler, ordered)

Then of course you must subscribe to the single.

Regards

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/eb27ca81-2584-41b8-b5fa-bcdc4b97b6cb%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Asher Tarnopolski

unread,
Jun 27, 2017, 10:01:03 AM6/27/17
to vert.x
hi thomas and thanks for a feedback.
in my case i need a solution that receives and observable which should be processed on blocking scheduler.
will it be enough to subscribe to it inside blockingCodeHandler to assure it will be executed in such manner?
  

Thomas SEGISMONT

unread,
Jun 27, 2017, 11:51:55 AM6/27/17
to ve...@googlegroups.com
Can you explain "processed on blocking scheduler" ?

Some Observables need to be subscribed on a blocking scheduler. Some need to be observed on a blocking scheduler.
This depends on how the Observable was created and the chain of operations.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Asher Tarnopolski

unread,
Jun 27, 2017, 11:58:29 AM6/27/17
to vert.x
hi again,

the point is that our app is not vertx-rxfied, but it makes a heavy use of standard rx.
what i needed is to make sure that whenever we need we can execute an observable or single on a worker thread. 
another requirement was to provide separated worker pools in some cases. that's all.

i followed your solution and ended up with this:

protected <T> Single<T> executeBlocking(Single<T> blocking, boolean ordered) {
    io
.vertx.rxjava.core.WorkerExecutor we = new io.vertx.rxjava.core.Vertx(vertx).createSharedWorkerExecutor("worker1");
   
return we.rxExecuteBlocking(f -> blocking.subscribe(

           
next -> {
               
we.close();
               
f.complete(next);
           
},
            f
::
fail
   
), ordered);
}

btw, should i call we.close() in this case?



Thomas SEGISMONT

unread,
Jun 27, 2017, 12:21:09 PM6/27/17
to ve...@googlegroups.com
Keep a single worker executor instance in the verticle. It should be closed automatically when verticle is stopped.

AFAIC, if I had to work with existing observables, I'd simply subscribe on a blocking scheduler:

WorkerExecutor we = vertx.createSharedWorkerExecutor("worker1"); // when verticle is started

Observable<X> myObservable = ...; // later
myObservable.subscribeOn(io.vertx.rxjava.core.RxHelper.scheduler(we))

You still have to know on which scheduler your observable operates (it may not be the subscription scheduler).

In most cases you want to get the results on the verticle context so:

Observable<X> myObservable = ...; // later
myObservable.subscribeOn(io.vertx.rxjava.core.RxHelper.scheduler(we))
  .observeOn(io.vertx.rxjava.core.RxHelper.scheduler(vertx))



--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.

Asher Tarnopolski

unread,
Jun 27, 2017, 1:29:23 PM6/27/17
to vert.x
got it, thanks.
Reply all
Reply to author
Forward
0 new messages