getSQLStreamSingleton(...)
.map(stream->stream.resultSetClosedHandler(s->stream.moreResults())
.handler(row->{Observable.just(row).subscribeOn(Schedulers.computation())
.map(_row->doFancyThings(_row))
.subscribe(res->{writeToRedis(res);});
.endHandler(v->{
vertx.setTimer(50, timer->{
// search redis
});
....
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/0e111ad7-83ed-43b8-8d4e-54f599da219b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hi,why do you need to involve different threads in this problem ?
in your code, each row is processed as an observable of a single elementhave you tried the Rxified version of JDBC client ?
Here comes, how the database is accessed
Single.using(DisposableVertxSQLConnection::new, f -> f.create(queryObserverProvider.getJdbcClient(), logger)
.flatMap(conn->conn.rxQueryStreamWithParams(queryParams.getPrepStatement(), queryParams.getParams()))//SQLRowStream
.map(stream-> stream.resultSetClosedHandler(s -> stream.moreResults())
.handler(row->{...})
.exceptionHandler(t->{handler.handle(Future.failedFuture(t));})
.endHandler(v->{handler.handle(Future.succeededFuture(entryList));}))
, f -> f.dispose()).subscribe(res->{});