what is the best way to implement map/reduce with Vertx RowStreams

85 views
Skip to first unread message

E. Ulrich Kriegel

unread,
Aug 2, 2018, 7:47:37 AM8/2/18
to vert.x
I have a Verticle which tries to use the map/reduce pattern
Map: Retrieve rows from a SQL database using SQLRowStreams, processes the streams in different threads writing records in a redis database. 
Reduce: When all information is stored in redis, searching the redis database.

I tried to put the reduce part into the endHandler, but sometimes writing to redis is not yet completed.
Using a timer helps, but that is not very elegant and can result in errors, if the hardware changes. 

What is the correct solution to implement map/reduce with vertx.
That is the code
getSQLStreamSingleton(...)
.map(stream->stream.resultSetClosedHandler(s->stream.moreResults())

.handler(row->{Observable.just(row).subscribeOn(Schedulers.computation())

              .map(_row->doFancyThings(_row))    

               .subscribe(res->{writeToRedis(res);});

.endHandler(v->{

      vertx.setTimer(50, timer->{
// search redis

});


....



Julien Viet

unread,
Aug 2, 2018, 6:20:36 PM8/2/18
to ve...@googlegroups.com
Hi,

why do you need to involve different threads in this problem ?

in your code, each row is processed as an observable of a single element

have you tried the Rxified version of JDBC client ?

I would suggest doing that as it gives you an Observable from the stream directly

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/0e111ad7-83ed-43b8-8d4e-54f599da219b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

E. Ulrich Kriegel

unread,
Aug 3, 2018, 6:37:29 AM8/3/18
to vert.x
Hi,


Am Freitag, 3. August 2018 00:20:36 UTC+2 schrieb Julien Viet:
Hi,

why do you need to involve different threads in this problem ?
in order to use the ability of different processors, because it is a vast amount to data to process.
In the map phase at first a database query collects candidates which are than compared in detail and the results of the comparison is stored in redis.
During the last 3 years my customers agreed how the candidates are to be collected from the database: The resultset was in the order of 10 to 20 entries and everything was processed in less than 100 ms. Now they changed their minds and the result sets are in the order of some 100 rows until 10000 rows.
Therefore, parallelisation is the only way to cope with that amount of data. 
 

in your code, each row is processed as an observable of a single element

have you tried the Rxified version of JDBC client ?
i use the default JdbcClient because the services must be able to query Oracle as well as Postgres databases 
Here comes, how the database is accessed

Single.using(DisposableVertxSQLConnection::new, f -> f.create(queryObserverProvider.getJdbcClient(), logger)
 
.flatMap(conn->conn.rxQueryStreamWithParams(queryParams.getPrepStatement(), queryParams.getParams()))//SQLRowStream


 
.map(stream-> stream.resultSetClosedHandler(s -> stream.moreResults())

 .handler(row->{...})

 .exceptionHandler(t->{handler.handle(Future.failedFuture(t));})

 .endHandler(v->{handler.handle(Future.succeededFuture(entryList));}))

, f -> f.dispose()).subscribe(res->{});


 
Reply all
Reply to author
Forward
0 new messages