Streaming and vertx-sql-client

437 views
Skip to first unread message

jim-g...@spudsoft.co.uk

unread,
Jan 1, 2022, 4:39:10 AM1/1/22
to vert.x
Hi,

I want to use vertx-sql-client to stream large queries from MS, My and Postgres.
As things stand I think there are two ways to do this, using RowStream or Cursors and I think RowStream is implemented in terms of Cursors and Cursors aren't implemented for MS yet (my first test using RowStreams throws an Unsuppported exception from TdsMessageEncoder when it gets a CloseCursorCommand).

What I don't really get though is why streaming isn't the native way that database queries are handled in vertx-sql-client (or even in JDBC to be honest).
The data comes in as a stream of rows, so why can't they just be fed to the consumer as they come in (with minimal buffering)?

Is there some way I can do this, or is there a fundamental reason why it can't be done?

Thanks.

Jim

Thomas SEGISMONT

unread,
Jan 3, 2022, 3:47:32 AM1/3/22
to ve...@googlegroups.com
Hi,

Indeed, the Reactive MS SQL client does not support streaming yet but it is planned for Vert.x 4.3.

Regarding your general question about streaming, you could look at everything going over the network as a stream.
In practice though, it is more efficient for the general use case (retrieving a relatively low number of rows) to buffer results.

Regards,
Thomas

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/9d6b8db7-aa90-49b4-be8b-3dadc3da0872n%40googlegroups.com.

jim-g...@spudsoft.co.uk

unread,
Jan 3, 2022, 2:37:26 PM1/3/22
to vert.x
Hi Thomas,

Whilst you are definitely right about the usual use cases, I am frustrated that JDBC makes streaming so unreliable and I'm hoping that the Vertx client can do something better for my specific situation.

In the current API I think using a Collector would get me access to the data as it is processed, something like:
                          Collector<Row, ?, Integer> collector = Collectors.summingInt(row -> {
                            // process the row here
                            // actually convert to json, add additional data from other queries to it and then feed out to another stream
                            return 1;
                          });
                          return conn.preparedQuery(sql)
                                  .collecting(collector)
                                  .execute()
                                  .onSuccess(result -> {
                                    logger.debug("Rows: {}", result.value());
                                  })

Can you see any fundamental problem with doing that?

Thanks.

Jim

Christopher Tate

unread,
Jan 3, 2022, 4:36:47 PM1/3/22
to vert.x
You can stream data from MSSQL, MySQL and PostgreSQL using the vertx-jdbc-client, I have used the vertx-jdbc-client to stream quickly and successfully from all of these databases on various projects. You'll want to include the sqlserver JDBC driver in your pom.xml file:

                <dependency>
                        <groupId>com.microsoft.sqlserver</groupId>
                        <artifactId>mssql-jdbc</artifactId>
                        <version>${mssql-jdbc.version}</version>
                </dependency>

And then configure your JDBC connection to the database like this, your JDBC URL will be different for MSSQL:

        private Future<Void> configureMoonshotsData() {
                Promise<Void> promise = Promise.promise();

                try {
                        JsonObject jdbcOptions = new JsonObject();
                        jdbcOptions.put("driver_class", config().getString(ConfigKeys.MOONSHOTS_DRIVER_CLASS));
                        jdbcOptions.put("url", String.format("jdbc:mysql://%s:%s@%s:%s/%s?useSSL=false&zeroDateTimeBehavior=convertToNull",
                                        config().getString(ConfigKeys.MOONSHOTS_USERNAME)
                                        , config().getString(ConfigKeys.MOONSHOTS_PASSWORD)
                                        , config().getString(ConfigKeys.MOONSHOTS_HOST)
                                        , config().getString(ConfigKeys.MOONSHOTS_PORT)
                                        , config().getString(ConfigKeys.MOONSHOTS_DATABASE)
                                        ));
                        jdbcOptions.put("max_pool_size", config().getInteger(ConfigKeys.MOONSHOTS_MAX_POOL_SIZE));
                        jdbcOptions.put("min_pool_size", config().getInteger(ConfigKeys.MOONSHOTS_MIN_POOL_SIZE));
                        jdbcOptions.put("max_idle_time", config().getInteger(ConfigKeys.MOONSHOTS_MAX_IDLE_TIME));
                        jdbcOptions.put("max_statements", config().getInteger(ConfigKeys.MOONSHOTS_MAX_STATEMENTS));
                        jdbcOptions.put("max_statements_per_connection", config().getInteger(ConfigKeys.MOONSHOTS_MAX_STATEMENTS_PER_CONNECTION));

                        jdbcClient = JDBCClient.createShared(vertx, jdbcOptions);
                        LOG.info(configureMoonshotsDataComplete);
                        promise.complete();
                } catch(Exception ex) {
                        LOG.error(configureMoonshotsDataFail, ex);
                        promise.fail(ex);
                }
                return promise.future();
        }

Then you can open a connection to your database, and set the FetchSize in the SQLConnection options:

        private void importDataClass(String classSimpleName, ZonedDateTime startDateTime) {
                        try {
                                jdbcClient.getConnection(a -> {
                                        if(a.succeeded()) {
                                                SQLConnection sqlConnection = a.result();
                                                sqlConnection.setOptions(new SQLOptions().setFetchSize(config().getInteger(ConfigKeys.MOONSHOTS_FETCH_SIZE)));
                                                importDataCurrikiResource(sqlConnection).onComplete(b -> {

Then you can query all rows from a large table with `SELECT * from` and setup a SQLRowStream object. I recommend calling .pause() before starting to fetch any data, or the fetch will rapidly  start streaming and running you out of memory, or CPU or database connections before it can catch up. Then you can fetch a certain amount, and handle each row. I setup my own counter object to count the number of rows fetched, and the number of rows processed and fetch more when the number of rows processed has nearly caught up to the number of rows fetched. I create a separate method like processRowCurrikiResource to process an individual row.

        private Future<Void> importDataCurrikiResource(SQLConnection sqlConnection) {
                Promise<Void> promise = Promise.promise();

                try {
                        sqlConnection.queryStreamWithParams(
                                        "SELECT * from currikidb.resources"
                                        , new JsonArray(), a -> {
                                SQLRowStream sqlRowStream = a.result();
                                Integer fetchSize = config().getInteger(ConfigKeys.MOONSHOTS_FETCH_SIZE);
                                ApiCounter counter = new ApiCounter();
                                sqlRowStream.pause();
                                sqlRowStream.fetch(fetchSize);
                                sqlRowStream.resultSetClosedHandler(b -> {
                                        sqlRowStream.moreResults();
                                }).handler(row -> {
                                        counter.incrementQueueNum();
                                        processRowCurrikiResource(row).onSuccess(b -> {
                                                counter.decrementQueueNum();
                                                counter.incrementTotalNum();
                                                if(counter.getQueueNum().compareTo(0L) == 0) {
                                                        sqlRowStream.fetch(fetchSize);
                                                }
                                        }).onFailure(ex -> {
                                                LOG.error(importDataCurrikiResourceFail, ex);
                                                promise.fail(ex);
                                        });
                                }).exceptionHandler(ex -> {
                                        LOG.error(importDataCurrikiResourceComplete, ex);
                                        promise.fail(ex);
                                }).endHandler(b -> {
                                        LOG.info(importDataCurrikiResourceComplete);
                                        promise.complete();
                                });
                        });
                } catch(Exception ex) {
                        LOG.error(importDataCurrikiResourceFail, ex);
                        promise.fail(ex);
                }

                return promise.future();
        }

You can see more details here where we are working on streaming a brand new MSSQL data source with many rows:

Hope that helps.
Happy New Year!

Thomas SEGISMONT

unread,
Jan 7, 2022, 4:26:06 AM1/7/22
to ve...@googlegroups.com
Le lun. 3 janv. 2022 à 20:37, jim-g...@spudsoft.co.uk <jim-g...@spudsoft.co.uk> a écrit :
Hi Thomas,

Whilst you are definitely right about the usual use cases, I am frustrated that JDBC makes streaming so unreliable and I'm hoping that the Vertx client can do something better for my specific situation.

In the current API I think using a Collector would get me access to the data as it is processed, something like:
                          Collector<Row, ?, Integer> collector = Collectors.summingInt(row -> {
                            // process the row here
                            // actually convert to json, add additional data from other queries to it and then feed out to another stream
                            return 1;
                          });
                          return conn.preparedQuery(sql)
                                  .collecting(collector)
                                  .execute()
                                  .onSuccess(result -> {
                                    logger.debug("Rows: {}", result.value());
                                  })

Can you see any fundamental problem with doing that?

You can use a collector to process rows as they are received, no problem with that.
In the end you will get an object which is the output of the collector.
 
Reply all
Reply to author
Forward
0 new messages