Is there any equivalent piece of code for RxJava2 Flowable example on vert.x Mutiny?
Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
.rxBegin()
.flatMapPublisher(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toFlowable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription subscription) {
sub = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
sub.request(1);
System.out.println("User: " + row.getString("last_name"));
}
@Override
public void onError(Throwable err) {
System.out.println("Error: " + err.getMessage());
}
@Override
public void onComplete() {
System.out.println("End of stream");
}
});