MicroProfile Reactive Streams Operators release

172 views
Skip to first unread message

James Roper

unread,
Jul 25, 2018, 12:18:56 AM7/25/18
to MicroProfile
Hi all,

Today I gave an update of MicroProfile Reactive in the MicroProfile community hangout, and something that came up is the question of whether a MicroProfile Reactive Streams Operators release would be useful to any other specs.

To give a bit of context, MicroProfile Reactive Streams Operators (it's a mouthful, so I'll shorten to RS Ops for the remainder of this email) is a spec [1] similar to the JDK8 java.util.stream API, but for Reactive Streams. It is functionally equivalent to Akka Streams, RxJava or Reactor, but unlike those technologies, it's a specification that can have independent implementations by multiple vendors. By itself it's not that relevant to MicroProfile, it just sits as a stand alone technology, and the intention is that after incubating in MicroProfile, it will be considered for inclusion in the JDK itself. However, it is necessary to have a library like this available if any specs wish to make use of Reactive Streams as a first class concept, since Reactive Streams itself is not intended as an end user API, but rather as an integration API that libraries should implement. For example, a database driver might provide Reactive Streams support (as the new ADBA asynchronous database spec by Oracle does), or an HTTP client might provide Reactive Streams support (as the new JDK9 HTTP client does), but if users want to plumb these together, and do some manipulation of the data (eg turning database rows from ADBA into bytes for JDK9 HTTP client), there's no simple way to do that without a library such as RS Ops. If we were to wait for the JDK to provide this itself, it would be years before MicroProfile (or Jakarta EE) could make any significant use of Reactive Streams.

RS Ops is being developed in tandem with MicroProfile Reactive Messaging [2], a spec for doing asynchronous messaging between microservices through a message broker such as Kafka. Reactive Messaging depends heavily on RS Ops, and the use cases the messaging spec has are being used as the primary decider for the scope of version 1 of RS Ops. The current intention is that RS Ops 1.0 will be released when Reactive Messaging 1.0 is released. However, RS Ops is almost in a state where it's ready to be released now, and certainly can be ready in time for MicroProfile 2.1. Reactive Messaging however may not be ready in time for MicroProfile 2.1 (though we will try). So the question is, would any other specs find it useful to have a release of RS Ops, for example to use for experimentation with future reactive features, as part of MicroProfile 2.1? Or even not as part of MicroProfile 2.1 - it may just be convenient to have a released version available to pull in as a third party library.

For further info about the status of RS Ops, the scope (for version 1.0) is pretty close to finalised and implemented, the specifications and javadocs could probably do with a little more massaging but are more or less complete, and there's possibly a small amount of additional testing needed in the TCK, but it's pretty comprehensive (currently runs almost 1000 individual tests, though this makes it sound like a lot more work has been done than actually has, a lot of these are brought in by running the Reactive Streams TCK [3] on each operator stage). There currently exist 3 implementations passing the TCK, one based on Akka Streams created by Lightbend [4], one standalone implementation (ie, has zero dependencies) created by Lightbend [5], and one based on RxJava and Vert.x created by RedHat for SmallRye [6].

Ken Finnigan

unread,
Jul 26, 2018, 2:16:48 PM7/26/18
to MicroProfile
James,

I think it's perfectly fine for RS Ops to be released and available within the next MP release, assuming general agreement from the community with that, whether or not Reactive Messaging is ready or not.

I would liken the situation to the initial MP 1.0 release that was only Java EE 7 specifications. RS Ops is a similar foundation piece to those Java EE 7 specifications, so it shouldn't be held back from being part of a release just because nothing *may* be ready to use it at the same time.

Ken

--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/microprofile/CABY0rKOTSf8WpexiL9VPAi4zE%3DLG-Tod-R_J4yVdfnWEnh2vxg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Emily Jiang

unread,
Jul 26, 2018, 7:16:53 PM7/26/18
to Eclipse MicroProfile
What is the point to release it if no one is going to use it. At the moment, only messaging is going to use it but it is not ready. If the only consumer is messaging, it makes more sense to wait as it will be tested better with consumers. Hence this thread discussion. We need to find out who is waiting for it.

By the way, I have not found any immediate dependency from the specs like Config, Fault Tolerance.

my 2 cents.
Emily

Ken Finnigan

unread,
Jul 26, 2018, 10:37:24 PM7/26/18
to MicroProfile
I see several benefits in not waiting for it to be part of an MP release if RS Ops is ready:
  • It enables MP specs to begin experimenting with it without needing to rely on SNAPSHOT builds
  • It gives certainty as to RS Ops being part of MP. If it's only included when Reactive Messaging is ready, there's no incentive for other specifications to investigate using it as there's always the chance Messaging gets pushed, for whatever reason.
  • There's no reason it can't begin to be used by developers wanting to replace their usage of RxJava with RS Ops within MP applications
  • Why not showcase the great work going into RS Ops when it's ready? It helps foster the innovation and drive of MP. If we hold it back for non technical reasons, there's the risk we disenfranchise parts of the community that want the fast evolution.
If you believe RS Ops to have not been tested sufficiently, can you provide some examples of additional TCK tests that could be added to help improve it?

Ken

James Roper

unread,
Jul 27, 2018, 1:29:11 AM7/27/18
to MicroProfile
I think the testing that Emily is referring to is not about compliance, but more, testing the usability and suitability of the API against real use cases - does it make sense, is it easy to use, are the names right and forward compatible with future features, does it provide all the features needed, etc. In particular, once we release 1.0, we can't (or should very strongly try and avoid) making any breaking changes. Now there's no avoiding dilemma, but waiting till we have a spec that uses it is released will give us more confidence in the API than releasing it without any defined use cases in MicroProfile. That said, I'm not sure that it will give us that much more confidence, for two reasons, by this stage in the messaging spec we already have a pretty good idea of what we need from RS Ops, secondly, neither RS Ops nor Reactive Messaging are completely new styles of API, we've started with existing third party APIs and extensive experience using them to come up with a new standard that takes the best of these APIs.

Personally I like the idea of releasing sooner, as the sooner we get it out there, the sooner we can get feedback and improve it. We're especially interested in getting feedback on how it should deal with CDI context. At this point, we have excluded any form of CDI context support from 1.0, this has been discussed heavily both within the Reactive group as well as with the CDI spec lead (Antoine), because there's a number of routes that we could take (eg, create a streams context that is designed to be propagated across asynchronous calls, or try and propagate existing scopes, etc), but it's really hard to know which route is best, so we'd like to get some user feedback first to see what users want to do with CDI contexts in streams.

As an aside (no need to read the rest of this email if you're not interested in the details of RS Ops), personally my biggest fear is that some of the names we've chosen for methods could make choosing names for future new features difficult, but I'm not sure that waiting until Reactive Messaging is released will have any impact on that. As a concrete example of this fear, we have a method called flatMap, its signature looks like this:

class PublisherBuilder<T> {
  <R> PublishBuilder<R> flatMap(Function<T, PublisherBuilder<R>> f);
}

Now, at face value, this seems like a pretty straight forward method to implement, but there are actually two main different possibilities for how it might behave:

* The effect can be to operate sequentially on incoming T's, creating one PublisherBuilder at a time, which is emitted till its exhausted, at which point we process the next T - the effect is that the publishers are concatenated.
* The effect can be to operate concurrently on incoming T's, invoking the f function as the Ts arrive, and merge all the PublisherBuilders that are created together.

One of our goals is to keep the API simple (and therefore constrained), since there already exist plenty of other APIs out there that offer a kitchen sink of functionality that a user can use (and they integrate easily into this by virtue of using Reactive Streams). So we've decided that the method should just deal with one T at a time, having the effect of all the PublisherBuilders being concatenated. But what if we find in future that working with multiple at a time is a necessary feature that we can't do without? Should that method also be called flatMap? Maybe we should rename this one to flatMapConcat or concatMap, and the other one to flatMapMerge or mergeMap, to make it clear which does what? And of course, the APIs that we're basing our decisions off (RxJava, Akka Streams, Reactor), have all manner of different approaches to the naming here, so we can't just defer to what they do because they disagree. And once 1.0 is released, we can't easily change the name of flatMap. So anyway, it's those kinds of things that I'm not sure about - the reality is though that there is no right answer to these questions and no matter how hard we try we will always encounter scenarios in future where it'll turn out the decision we made wasn't the best one.


For more options, visit https://groups.google.com/d/optout.


--

Clement Escoffier

unread,
Jul 27, 2018, 2:39:57 AM7/27/18
to 'Heiko Rupp' via Eclipse MicroProfile
Hello,

I’m in favor of releasing the MicroProfile Reactive Streams Operators early to gather feedback, especially on missing operators. Having this spec released lets other specification uses it, and there are specifications where it makes sense: REST client, LRA (it’s a bit more convoluted)... There is also an ongoing discussion about JAX-RS “next” where reactive has been mentioned. 

Now, of course, naming is hard, and we had to pick names, and an associated semantic which can be a bit confusing. James mentioned the flatMap example that behaves as an RX concatMap (or an RX flatMap with a maxConcurrency set to 1). I don’t have a strong opinion because I believe we can adjust the “semantic” using parameters. If the Javadoc mentions the behavior, and make it clear, the user would understand the semantic.  (Open question: should we use  marble diagrams in the Javadoc).

James also mentioned it, the spec and Javadoc could be improved a bit. No significant refactoring, maybe more examples, and adding the description of the supported stages.  

Clement

Ladislav Thon

unread,
Jul 27, 2018, 3:42:22 AM7/27/18
to MicroProfile
pá 27. 7. 2018 v 8:39 odesílatel Clement Escoffier <clement....@gmail.com> napsal:
Open question: should we use  marble diagrams in the Javadoc


YES PLEASE!

LT 

Ondro Mihályi

unread,
Jul 29, 2018, 3:19:24 AM7/29/18
to Eclipse MicroProfile
I see there are significant pros and cons to releasing early. I suggest releasing a milestone first, so that the API can be evaluated ASAP but leaving space for breaking changes. And leave at least a month (better 2) between a milestone release and an RC release.

According to the discussion I had with James Roper at EclipseCon, there's a potential that MP Reactive Ops will become a standard beyond MP, like Reactive Streams. And it would be beneficial to get feedback from a wider community of Java reactive programmers (users of RxJava, Spring Reactor, etc).

If we release 1.0 too soon, before getting reasonable feedback from a wider community, there's very little chance that it will be adopted by users outside MP. And also very hard to improve it within MP without breaking changes.

Therefore a milestone release can help to gather this feedback.

--Ondro

Emily Jiang

unread,
Aug 12, 2018, 10:30:08 AM8/12/18
to Eclipse MicroProfile
Slowly catch up with the notes from my 2-week holiday...

James got me exactly right. I have the concerns as per James's Note "In particular, once we release 1.0, we can't (or should very strongly try and avoid) making any breaking changes."

Clement, when you say  MP Rest client needs it, can you provide a use case? I am less concerned about LRA as it has more work to be done.

For me, Operator cannot be easily used if no operands are there. Messaging spec is to provide operands.

Therefore, I agree with Ondro. Just release some snapshot and push them to maven central just in case someone wants to experiment it.

Thanks
Emily

James Roper

unread,
Aug 14, 2018, 3:09:09 AM8/14/18
to MicroProfile

--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Clement Escoffier

unread,
Aug 14, 2018, 3:29:51 AM8/14/18
to Eclipse MicroProfile
Hello,



On 12 Aug 2018, at 16:30, 'Emily Jiang' via Eclipse MicroProfile <microp...@googlegroups.com> wrote:

Slowly catch up with the notes from my 2-week holiday…


James got me exactly right. I have the concerns as per James's Note "In particular, once we release 1.0, we can't (or should very strongly try and avoid) making any breaking changes."

Clement, when you say  MP Rest client needs it, can you provide a use case? I am less concerned about LRA as it has more work to be done.

About the REST client, let’s take one use case the get method:

    @GET
    @Path("/one")
    CompletionStage<Response> get();

I have copied the async version from the spec, but it’s not required. The Response is more interested in this use case, especially how the body from the response is read. I understand that the Response class comes from Jax RS - but let’s ignore this for now as we can imagine providing a MicroProfile specific Response. The issue is that when you read the body, it’s generally done in one go. If the body is big (or run under high concurrency), you are likely running OOM. In this case, reading the body as a stream would be beneficial. Something like:

response.asPublisherBuilder().subscribe(…);

Another use case, if when the HTTP response is a chunked response, so sent chunk by chunk. We could read it chunk by chunk and process the chunk one by one instead of accumulating the response in memory:

response.asPublisherBuilder().map(chunk -> ….).subscribe(content -> send the chunk somewhere else)

Same applied for chunked requests, where the client does not send the body in one big blob but chunk by chunk. This aspect is also valid when uploading data coming from a stream (file system, another response or the next to be MP Reactive Messaging).

I don’t have an exact idea of what could be the API, but using MP Reactive Streams Operators in the client can open the road to new use cases. 



For me, Operator cannot be easily used if no operands are there. Messaging spec is to provide operands.

I have started developing a few prototypes using it directly (more to check the missing operators), and it can be used without messaging. It’s quite nice. I will come with a few (polished) examples soon. Right now, they run in Vert.x which is not necessarily the primary target. 


Therefore, I agree with Ondro. Just release some snapshot and push them to maven central just in case someone wants to experiment it.

Snapshots or milestones? 
Snapshots are tricky - they require snapshot repository on the consumer side, and can break at any time. 
Milestones are “early release - so the consumer can use them reliably as they are stored in Maven Central, and update to the next milestone if needed.

I believe that milestones are better to collect feedback.

Clement


Thanks
Emily


On Sunday, July 29, 2018 at 9:19:24 AM UTC+2, Ondro Mihályi wrote:
I see there are significant pros and cons to releasing early. I suggest releasing a milestone first, so that the API can be evaluated ASAP but leaving space for breaking changes. And leave at least a month (better 2) between a milestone release and an RC release.

According to the discussion I had with James Roper at EclipseCon, there's a potential that MP Reactive Ops will become a standard beyond MP, like Reactive Streams. And it would be beneficial to get feedback from a wider community of Java reactive programmers (users of RxJava, Spring Reactor, etc).

If we release 1.0 too soon, before getting reasonable feedback from a wider community, there's very little chance that it will be adopted by users outside MP. And also very hard to improve it within MP without breaking changes.

Therefore a milestone release can help to gather this feedback.

--Ondro





-- 
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.

Clement Escoffier

unread,
Aug 14, 2018, 3:30:08 AM8/14/18
to Eclipse MicroProfile

Stephane Epardaud

unread,
Aug 14, 2018, 4:55:48 AM8/14/18
to microp...@googlegroups.com
FTR RESTEasy already supports sending responses in chunks if they are of the type `Flowable<String>` (or Flowable<ByteBuffer>, Flowable<byte[]>, Flowable<char[]>), but that pretty much excludes custom status codes/headers that would be set in a Response.

Though in order to keep a parallel between the types X that could be returned by a resource method (such as that Flowable<String>) I would rather Response.getEntity() supported the exact same types X. That is, it should be exactly the same to return a Flowable<String> from the resource method as it would be when returning a Response whose entity is a Flowable<String>. I don't believe this to be the case, but it would be symmetrical and easy to discover/intuitive and would support your use-case.


For more options, visit https://groups.google.com/d/optout.


--
Stéphane Épardaud

Ondro Mihályi

unread,
Aug 14, 2018, 8:10:58 PM8/14/18
to Eclipse MicroProfile
Hi,

I think that it would even be useful to allow registering asynchronous message body writers. JAX-RS now supports only synchronous writers which are expected to produce response body before they return from the writer method. It's not possible to convert an asynchronous return type like Flowable into a response with a message body writer because it's not guaranteed that the implementation waits until the response is written in a different thread asynchronously.

If we add support for asynchronous message body readers/writers in JAX-RS (and MP REST client), users could use any asynchronous structure like Flowable, RxJava Single or similar. CompletionStage is currently the only supported asynchronous structure but it's a special case and its message body writer can't be overriden by a custom writer.

An example of how that could work:

@Provider
public class CustomFlowableMessageBodyWriter implements AsyncMessageBodyWriter<Flowable<String>> {
}

    @GET
    @Path("/one")
    Flowable<String> get();

It would then be easy to transparently provide support for a new MicroProfile reactive Response or for PublisherBuilder from the stream operators. As a side effect, it would allow pluggable support for returning any reactive structure without converting it manually.

--Ondro

James Roper

unread,
Aug 14, 2018, 8:33:54 PM8/14/18
to MicroProfile
Just to point something out with respect to Flowable, MicroProfile Reactive Streams Ops is able to do something that Flowable can't - that is, be a dependency of MicroProfile and Jakarta EE specs. Flowable is a proprietary API, maintained by RxJava, it's not a standard and doesn't allow multiple implementations. Hence, JAXRS can't depend on it and can't offer standard support for Flowable (it's up to implementations to offer this as an extension), but if MicroProfile Reactive Streams Ops is adopted as a Jakarta EE spec, then JAXRS will be able to offer (and require) support for it, not only that, but it will also be able to offer much more fluent support, being able to support it in return types and parameters of APIs, rather than only being able to offer it through parameterized types.


For more options, visit https://groups.google.com/d/optout.

Ladislav Thon

unread,
Aug 17, 2018, 9:45:50 AM8/17/18
to MicroProfile

út 14. 8. 2018 v 9:09 odesílatel James Roper <ja...@lightbend.com> napsal:
On Fri, 27 Jul 2018 at 17:42, Ladislav Thon <lad...@gmail.com> wrote:
pá 27. 7. 2018 v 8:39 odesílatel Clement Escoffier <clement....@gmail.com> napsal:
Open question: should we use  marble diagrams in the Javadoc

YES PLEASE!

Well, with that sort of response, I guess I had to do it.


See here for a demo:



Such marbles! Very thanks! Wow!

LT
 


LT 

--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/microprofile/CALbocOmAJf2NteHOxJbMfm%3D%2BPsAV%3DDd_s8WTup1Ye0ziNsg%3Dcg%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


--
James Roper
Senior Developer, Office of the CTO

Lightbend – Build reactive apps!
Twitter: @jroper

--
You received this message because you are subscribed to the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this group and stop receiving emails from it, send an email to microprofile...@googlegroups.com.
To post to this group, send email to microp...@googlegroups.com.

Gordon Hutchison

unread,
Aug 20, 2018, 3:01:29 AM8/20/18
to Eclipse MicroProfile

Hi Ondro and others,

Would it be possible to take advantage of section 5.7.1 of JAX-RS 2.1 spec to register a Reactive API Extension
that makes use of a MicroProfile Reactive Streams Implementation in the server?
(Andy McCright alerted me to that bit of the spec.)

Ondro, were you thinking of something analogous to:


With MicroProfile value add?

Gordon.

Gordon Hutchison

unread,
Aug 20, 2018, 3:05:33 AM8/20/18
to Eclipse MicroProfile

Ondro Mihályi

unread,
Aug 20, 2018, 7:47:02 PM8/20/18
to MicroProfile
Hi Gordon,

There are 2 things related to JAX-RS and reactive support
 - in the client
 - in the server

On the client, JAX-RS allows registering a custom invoker to return any reactive interface, such as RxJava Flowable or JDK 9 Flowable. The first link you posted is an example of using Jersey RxFlowableInvoker that returns RxJava Flowable. This is a general invoker that works in any case, but it will generate onNext event only after the whole response is mapped to a Java object. To improve efficiency, it's possible to write a custom invoker that provides chunks of a response is as the response is read. Similar to SAX parser that generates SAX events as an XML is parsed. In summary, JAX-RS provides a hook good enough to processes a response reactively, nothing we need to improve in JAX-RS. MicroProfile could define one or more standard invokers to turn a JAX-RS client call to a reactive stream. A code example of what's already possible with JAX-RS:

Client client = ClientBuilder.newClient();
        client.register(MicroProfileProvider.class);
        WebTarget target = client.target("http://localhost:8080/jaxrs-async/rest/ejb");
        target.request()
        .rx(MicroProfileJsonInvoker.class)
        .get(JsonDataEvent.class)   // will return a Flowable that emits JSON data as they are detected during parsing of a JSON response)
        .buildRs()
        .subscribe()

The situation is different on the server. It's possible to return a CompletionStage from a method, but it has to be completed with a whole response entity at once. The same applies to using AsyncReponse - it's also resumed with a whole response entity. The example from CXF only helps turning AsyncReponse to a subscriber, but it awaits a complete response to resume the AsyncReponse. It doesn't send the response in chunks as they are available but waits for all data to be available. JAX-RS doesn't provide any other options and it would need to be extended to support more granular solution. The solution that I suggested is to support building a response asynchronously besides synchronous MessageBodyWriter interface. An asynchronous MessageBodyWriter would just need a method to finalize the response after the response is written into the output stream. Then it would be possible to do the following (it's not possible with the current JAX-RS):
@GET
public Publisher<String> getPrimeNumbers(int limit) {
    return ReactiveStreams.generate(primeNumberGenerator::next)
             .limit(limit)
             .buildRs();   // flowbale which generates a sequence of prime numbers
}

And a MessageBodyWriter could look like:
public class FlowableStringWriter implements AsyncMessageBodyWriter<Publisher<String>> {
    writeTo(Publisher<String> publisher, Class<?> type, Type genericType, Annotation[] annotations, MediaType mediaType, MultivaluedMap<String,Object> httpHeaders, OutputStream entityStream, Runnable completionCallback) {

    publisher.subscribe(
.subscribe(new Subscriber<String>() {
                    @Override
                    public void onNext(String text) {
                        // add text to entityStream
                    }

                    @Override
                    public void onComplete() {
                        completionCallback.run();
                    }
       })
}

Containers would have to wait until the completionCallback is called before finalizing the REST response. It should work transparently even if a method returns Response<Publisher<String>> - the Response wrapper would be used to set response metadata and the body would be written by the async writer.

--Ondro



po 20. 8. 2018 o 9:05 Gordon Hutchison <gordon.h...@gmail.com> napísal(a):
You received this message because you are subscribed to a topic in the Google Groups "Eclipse MicroProfile" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/microprofile/oH6sYZPPMAg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to microprofile...@googlegroups.com.

To post to this group, send email to microp...@googlegroups.com.

Stephane Epardaud

unread,
Aug 21, 2018, 4:27:31 AM8/21/18
to microp...@googlegroups.com
I'm not sure this is the best place to discuss the future of JAX-RS, but what we did for RESTEasy (and what we will likely suggest for the next version of JAX-RS because it works well) is to define two new producers: 

- AsyncResponseProvider (to convert a single async type such as Single to CompletionStage) 
- AsyncStreamProvider (to convert a flowable type such as Flowable to an RS Publisher)


Once those converters are registered (and we do provide modules by default for RxJava), then they become as supported as CompletionStage is mandated in the spec. We used RS Publisher as the conversion target, because it's a spec, and small, and most reactive stream implementations provide converters to CompletionStage and Publisher.

This has the advantage of not having to rewrite all the writers, and not having to register any special invokers.


For more options, visit https://groups.google.com/d/optout.


--
Stéphane Épardaud

Gordon Hutchison

unread,
Aug 24, 2018, 5:04:37 PM8/24/18
to Eclipse MicroProfile

Thanks for your great reply Ondro (and apologies for my lack of one due to vacation),
There is quite a bit of vision for me (and others) to digest in what you describe - thanks again!
Gordon.
Reply all
Reply to author
Forward
0 new messages