Using Project Reactor with Vertx

1,603 views
Skip to first unread message

Manikanta G

unread,
May 28, 2018, 2:31:18 PM5/28/18
to vert.x
Hi,

From all the readings I've done on RxJava 2 & Project Reactor (https://projectreactor.io), they are trying to solve same async problem.
But what I don't understand is how Vertx modules of these libs implemented. RxJava 2 (https://vertx.io/docs/vertx-rx/java2/) implementation has Flow & Single producers and it has rxXXX() methods. But Reactive Streams implementation (https://vertx.io/docs/vertx-reactive-streams/java/) doesn't have any any of classes similar to RxJava 2 (ex: io.vertx.reactivex.core.Vertx or io.vertx.reactivex.ext.jdbc.JDBCClient). Does this mean I need to use Project Reactor along with RxJava 2? 

I'm totally confused about this for a long time. or, may be I got the RxJava 2 vs Project Reactor all wrong!

Can some one clarify on using Project Reactor with Vertx (ex: DB operations, HTTP Client operations, ...)

Thanks,
Manikanta G

Julien Viet

unread,
May 28, 2018, 2:54:03 PM5/28/18
to ve...@googlegroups.com
Hi,

actually RxJava2 and Reactor are very similar and share the same initial code base (https://github.com/reactor/reactive-streams-commons) and David Karnok co-developed reactive-streams-commons with Stephane Maldini.

However the initial intent of Reactive-Streams was to provide an "integration" spec between asynchronous libraries, e.g use Vert.x HTTP server with Akka, or whatever implements Reactive-Streams. That's why we do have the project vertx-reactive-streams : it provides adapters for using vertx streams with reactive streams.

Reactive-streams-commons was designed as an Reactive Programming library based on Reactive Streams which is a kind of "abuse" of the initial intent of the Reactive Streams specification.

Vert.x initially integrated with RxJava because this is a choice of a part of our community to use it instead of Vert.x callback continuation model because it can provide better composition of asynchronous constructs. So naturally when RxJava2 was released, we integrated Vert.x with RxJava2.

This integration with RxJava2 is a bit redundant with vertx-reactive-streams.

Since project Reactor is based on Reactive Streams you can use Vert.x RxJava2 with Reactor as well, at least for the Flowable part.

To get you even more confused and explain how this is totally f....e up (apologise but that's my sincere thoughts), the same Reactive Streams API has been ported to Java 9 as the infamous "Flow" API which are exactly the same interface but with a different name space. Of course the Flow API will be consumed by Java specification such as "sql2", you-name-it .

Now there is an intent to develop a new library that is Reactor/RxJava2 like but in the microprofile API (https://github.com/eclipse/microprofile-reactive/tree/master/streams/api/src/main/java/org/eclipse/microprofile/reactive/streams), that they would like at some point hypothetically fit in the JDK at some point.

HTH 

if you have more questions I'll do my best to shed light on the state of the art of reactive-streams in 2018

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/61decb0a-4dc5-4394-9a0e-89ba7a1578c8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted
Message has been deleted

Julien Viet

unread,
May 29, 2018, 7:27:00 AM5/29/18
to ve...@googlegroups.com


On 28 May 2018, at 21:14, Manikanta G <go4...@gmail.com> wrote:

Oh, my. Thanks Julien for detailed write up. 

So, can I assume that I need to use RxJava 2 if I want to use Flux & Mono from Project Reactor? If yes, how do I get Flux/Single ? is it possible to get from Flow/Single ?

I think there must be a library for that ?


My requirement: Basically I need back pressure and batching of records from database: I m getting records from a table, say emp, using SQLClient.queryStreamWithParams(). Once I got the emp records, I need to load their addresses, say from address table. To do this, instead of querying each employee address individually, I want to query addresses of several employees in batch. Using batches at query level (ex: using MySQL LIMIT x) is not possible as needs to store the state of previous batch end and I don't want that). So, the Reactor API seems to offering this solution (ex: take(200) ). Instead of writing my custom solution, I want to use of existing solution to get the batching of the stream of DB records.

Rxjava2 provides also the take(N) operator, so why would you need Reactor ?


Please suggest if this can be achieved using any other alternate possible solution.

Thanks,
Manikanta G
Message has been deleted

Julien Viet

unread,
Jun 1, 2018, 1:56:06 PM6/1/18
to ve...@googlegroups.com
hi,

it is perfectly valid to use RxJava2 with Java 8+, many people do that.

When you use RxJava2 or Reactor, you don't really care wether it uses java.util.function.Consumer or the RxJava 2 consumer because you are creating them using lambdas or method reference.

In fact Reactor and RxJava2 is mostly the same code base with mostly the same performance.

So saying that Reactor is preferred because of Java 8 sounds more like an marketing argument to promote one project over another.

At the end of the day both Reactor and RxJava2 are actually deprecated because Java 9 provides the Flow API :-)

HTH

Julien

On 29 May 2018, at 14:17, Manikanta G <go4...@gmail.com> wrote:

Yes, I could use RxJava2's take(). But as I read from the mentioned links, seems like Reactor is recommended unless Java 6 support is required, which in my case not. So, I thought why not use Reactor?

I'll skip that idea for now. I'll go with RxJava2. 

And for now, I've implemented micro batching using queryStream and List myself by with just few lines of code, and it is working fine.

Thanks,
Manikanta G

Clement Escoffier

unread,
Jun 5, 2018, 2:03:22 AM6/5/18
to ve...@googlegroups.com
Hi,

First, Flow does not deprecate RX or Reactor. Flow is just a “copy” of the Reactive Streams interfaces. Without a library built on top of it it’s far from being usable (without shooting yourself in the foot). Until we have a better way to implement complex async coordination RX and Reactor are still very valuable in the Java world. 

Both RX and Reactor work using Java 8+.  The API differs a little bit because, as you mentioned, Reactor uses Java 8 as baseline while RX is still compatible with Java 6 (because of Android). However, the APIs are fairly similar and as Julien said you won’t even notice the differences because you use lambda and method references. There are conceptual differences between Reactor and RX but not really on this aspect. 

About your `take` use case. `take` is identical in RX and Reactor. The operator is defined here: http://reactivex.io/documentation/operators/take.html. So you can use RX without any issue.

If you still wants to use Reactor, you can bridge Reactor and RX:

* from RX to Reactor:

Flowable<X> flowable = …
Single<X> single = …

Flux<X> flux = Flux.from(flowable);
Mono<X> mono = Mono.from(single.toFlowable());

* from Reactor to RX:

Flux<X> flux = …
Mono<X> mono = …

Flowable<X> flowable = Flowable.fromPublisher(flux);
// More tricky as the type depends on the number of element - so be careful
Single<X> single =  Single.fromPublisher(mono);
Completable<X> completable = Completable.fromPublisher(mono);
Maybe<X> maybe = Maybe.fromSingle(Single.fromPublisher(mono)); // Yes… that one is tricky

Clement




Manikanta G

unread,
Jun 5, 2018, 3:07:48 AM6/5/18
to vert.x
Excellent. This all I needed. 

Thank you very much.

Thanks,
Manikanta G

Julien Viet

unread,
Jun 5, 2018, 3:12:18 AM6/5/18
to ve...@googlegroups.com
yes you are right, it's not deprecate since it's the same interfaces.

I meant that they will need a new version or implementation that will be usable OOTB with Java 9 flow and creating backward incompatibility with the previous versions.

Clement Escoffier

unread,
Jun 5, 2018, 3:37:34 AM6/5/18
to ve...@googlegroups.com

On 5 Jun 2018, at 09:12, Julien Viet <jul...@julienviet.com> wrote:

yes you are right, it's not deprecate since it's the same interfaces.

I meant that they will need a new version or implementation that will be usable OOTB with Java 9 flow and creating backward incompatibility with the previous versions.


Yes, and that’s a big deal…. Adapters - like the ones provided Today - failed to convinced me so far… 

Clement

Reply all
Reply to author
Forward
0 new messages