Wondering if you've looked at RxJava at all? Offers similar power to Promises (has a lot more operations, which you may/may not consider a good thing..)
the equivalent implementation for the problem you mention would be something like
// Request parser (you might parse to a strongly typed request object or just Json)
Func1<HttpServerRequest,JsonObject> requestParser=new Func1() ...
// Re-usable auth handler (can be async so can contact a remote service / DB) that validates any business request object (Http or EventBus)
Func1<JsonObject,Observable<JsonObject>> messageAuth=new Func1() ...
// HTTP Handler
return
Observable
.just(httpRequest)
// Parse to JSON
.map(requestParser)
// Validate auth
.flatMap(messageAuth)
// Process the request
.flatMap(new Func1<JsonObject,Observable<JsonObject>) {
public Observable<JsonObject> call(JsonObject req) {
// Use a DB wrapper to decode errors / help generate requests etc
return db
.find(...DB operation based on req...)
// Convert the successful response to appropriate JSON
.map(decodeResponse);
}
});
The output of every service is an Observable<JsonObject> that I encode into a HttpServerResponse for Http clients, or a Message.reply() for EventBus etc etc
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
val stream = Integer.stream
stream.listen
.take(2)
.map [ 'got number ' + it ]
.subscribe [ println('a: ' + it) ]
stream => [
apply(2.some)
apply(5.some)
apply(3.some)
apply(none)
]
def static <T> Observable<Message<T>> register(EventBus bus, String address, Class<T> msgType) {
val stream = new Stream<Message<T>>
bus.registerHandler(address) [ Message<T> it | stream.apply(it.option) ]
stream.listen
}
def static <T> Observable<Message<T>> register(EventBus bus, String address, Class<T> msgType, Procedure1<AsyncResult<Void>> onComplete) {
val stream = new Stream<Message<T>>
bus.registerHandler(address, [ Message<T> it | stream.apply(it.option) ], onComplete)
stream.listen
}
vertx.eventBus
.register('some/address', String)
.subscribe [
info('''got message «it.body»!''')
reply('thank you, got it')
]
Hi Tom,
Doesn't RXJava already support creating an observable from a Promise? I think it behaves just as you describe.
The reason I created a stream is that it is pretty cumbersome (IMO) to create a new observable from a handler, so I built a intermediary concept that allows this.
However you could use the same approach for a Future/Promise, and actually that is also how Reactor solves it:
https://github.com/reactor/reactor/wiki/Promises
In my case, I did it like this, works like the stream:
// create the future:
val promise = Integer.promise
// put something into it directly if you like
// promise.apply(3)
// observe the future by calling listen on it:
promise.listen
.map [ …] // do some mapping, take, etc
.subscribe [ println('found value ' + it) ]
// put something into it
promise.apply(12)
promise.apply(2) // throws an error, can only apply once
The term promise is used to avoid confusion with the Java Future class, which has a somewhat different interface. However, a Promise can be cast into a Future.
Christian
note, I made Promise<T> and Stream<T> both implement Procedure1<T>. This allows you to pass them as any other function.
I've changed the thread name..
On Wed, Oct 2, 2013 at 1:57 AM, petermd <pet...@gmail.com> wrote:
Wondering if you've looked at RxJava at all? Offers similar power to Promises (has a lot more operations, which you may/may not consider a good thing..)
I did read a bit about RxJava but have not tried it. I would say more is not better - but perhaps I haven't needed (or re-implemented) those ones yet! :)
I've seen Daryl's implementation and module, and also these chaps: https://github.com/englishtown/when.java who also did some work with when.js and vertx (there are fixes for vertx in the when.js project). It was mentioned on this list but did not get much response - so I hope they were not discouraged by that.
And there is my own version. The thing I found when going from Javascript to Java was in finding the right level of generics to use. My first implementation had a ton of them, but I backed down and now only have a single generic parameter on a promise (which is the resolved value) and the rejected value is a Throwable: https://gist.github.com/carchrae/6794509 (and yes, I'm still not sure if I think Throwable is the best default value for reject but I kind of liked it as a convention (eg, it can catch/handle all sorts of runtime errors) - otherwise, the reject value could be a second type parameter - I also realize this is not polyglot friendly).
I'm still giving some serious though to how much generics help with promises. One of the nice things about the javascript promises/A+ API is that often a value or a promise can be substituted. That pretty much flies in the face of generics; although the strictly typed hero would say, quick, create a class/interface called PromiseOrValue.
I was a Play 1.x user (and still am, and my current app is using both Vert.x and Play 1.x. I built a pretty simple bridge between them). In Play 1.x they had continuations and they were really cool, but did some awesome but dreadful bytecode magic to implement.
I looked in detail about implementing continuations in Vert.x some time back and rejected the idea. Continuations are very hard to implement effectively on the JVM