Moving to streams for crossing Async Boundaries

91 views
Skip to first unread message

Darren Mason

unread,
Jan 11, 2017, 12:34:37 AM1/11/17
to vert.x
Hi,

We have an implementation that is basically split into API realisation components, Services and Data Providers.
Generally APIs are realised through one or more service calls which in turn have data populated by messages from one or more data provider interfaces.

Currently I have interfaces that basically return Collections of Entities and I am exploring turning these collections into streams of some sort.
I really like the concepts in reactive streams in that they can be driven by the consumer through back pressure interfaces - and reading the documentation I can find for reactive streams looks great for what I want.

So my question is now focussed on the vert.x reactive streams implementation (or other suggestions for streaming implementations);
- How do I apply transformations and filtering using the vert.x ReadStream or ReactiveReadStream?
ie: if I want to transform a stream with something like stream.map(x -> ...) or filter it like stream.filter(x -> ...)
These don't seem to be functions of the streams in vert.x and I am sure I am just missing something simple, but are yet to work it out

My current implementation (using collections) relies on chaining transformations together and filtering quite heavily and I was assuming that streams would support this just as well.

Any suggestions?

Regards
Darren 

Alexander Lehmann

unread,
Jan 11, 2017, 8:13:43 AM1/11/17
to vert.x
There is a class in vert.x called Pump which takes care of transferring data from a ReadStream to a WriteStream with the necessary buffering, you could probably use that as a basis for a filter pump that applies a transformation.

Another possibility might be the RecordParser, though you may have to implement the flow control yourself in this case.

Darren Mason

unread,
Jan 12, 2017, 12:10:06 AM1/12/17
to vert.x
Thanks for the response. I guess that re-enforces that I have not just missed the functionality.

Creating an enriched pump is certainly an option, but it will mean that the transformations and filters will need to be known at the point that the streams are linked - which might not necessarily be the case.
I think that enriching the stream to have functions that produce streams will allow those resulting streams to get passed around and further enriched until they reach the point that they need to be 'pumped'.
I feel like I am re-inventing the wheel a bit here though - surely there must be implementations that have already done this? other libraries?

Regards
Darren

Alexander Lehmann

unread,
Jan 14, 2017, 8:45:22 AM1/14/17
to vert.x
I think the name Stream was used in Java until java 7 for IO streams mostly and was then used in java8 for the java.util.stream API, which is something quite different.

The ReadStream/WriteStream classes are mostly the same as they were in vert.x 2.x I think, which was created with java7, so it didn't take the stream api into consideration.

So chances are, there is no wheel to reinvent yet, at least I am not aware of anything.

Julien Viet

unread,
Jan 15, 2017, 4:06:59 AM1/15/17
to ve...@googlegroups.com
Hi,

Vert.x streams don’t provide transformation facilities today because this is not in the scope of Vert.x to do that.

I think you should look at the RxJava side of Vert.x with the RxJava project allow to turn ReadStream into Observable, however at the loss of the back pressure. Once you have an Observable you can write the items to a Vert.w WriteStream easily.

Besides there is an RxJava2 branch currently for Vert.x and RxJava2 API is based on Reactive-Streams specification, that is a Vert.x ReadStream<T> can be turned into a back-pressured Flowable<T>.

In both case we want to support these use case, so I encourage you to try with these and if you find something is lacking, that would be the opportunity to improve Vert.x :-)

Julien

ps : that being said one can should up with a Vert.x helpers that would do that in the community.


This email is confidential and intended solely for the person(s) to whom it is addressed.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/15ba0f50-0f08-4b43-8363-86635599a9b1%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages