Vertx RxJava3 Cassandra stream

27 views
Skip to first unread message

JNA LART

unread,
May 22, 2024, 3:28:45 AMMay 22
to vert.x
Hi,

I've found code examples on how to get a rx stream based on a CassandraRowStream.
It's OK and it works fine!
Using RxJava is a cool solution since i have some asynchronous processing, but i want to process the data ordering. I use concatMap, transforming my Future in Flowable.

Like this:
rxClient.rxQueryStream(
                                SimpleStatement.newInstance("SELECT * FROM test_table1 where tenantid='tenant1'").setPageSize(30)
                        ).flatMapPublisher(io.vertx.rxjava3.cassandra.CassandraRowStream::toFlowable).concatMap(row -> {
                            return AsyncResultSingle.<Map<String,Object>>toSingle ( handler -> {
                                enrichRow(client, statement2, row).onComplete(v -> handleEnrich(handler,v));
                            }).toFlowable();
                        }).subscribe(row -> {
                            System.out.println(row);
                        }, Throwable::printStackTrace, () -> {
                            System.out.println("Stream completed");
                        });

But i really need to get the paging state of the CassandraRowStream. Paging state is an important thing when you want to process huge stream of data.
With paging state, you can restart a processing on the last page you was before a failure for example.

Do you know how i can access this information in a rx Cassandra stream?

Thx.

JN
Reply all
Reply to author
Forward
0 new messages