You can stream data from MSSQL, MySQL and PostgreSQL using the vertx-jdbc-client, I have used the vertx-jdbc-client to stream quickly and successfully from all of these databases on various projects. You'll want to include the sqlserver JDBC driver in your pom.xml file:
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
</dependency>
And then configure your JDBC connection to the database like this, your JDBC URL will be different for MSSQL:
private Future<Void> configureMoonshotsData() {
Promise<Void> promise = Promise.promise();
try {
JsonObject jdbcOptions = new JsonObject();
jdbcOptions.put("driver_class", config().getString(ConfigKeys.MOONSHOTS_DRIVER_CLASS));
jdbcOptions.put("url", String.format("jdbc:mysql://%s:%s@%s:%s/%s?useSSL=false&zeroDateTimeBehavior=convertToNull",
config().getString(ConfigKeys.MOONSHOTS_USERNAME)
, config().getString(ConfigKeys.MOONSHOTS_PASSWORD)
, config().getString(ConfigKeys.MOONSHOTS_HOST)
, config().getString(ConfigKeys.MOONSHOTS_PORT)
, config().getString(ConfigKeys.MOONSHOTS_DATABASE)
));
jdbcOptions.put("max_pool_size", config().getInteger(ConfigKeys.MOONSHOTS_MAX_POOL_SIZE));
jdbcOptions.put("min_pool_size", config().getInteger(ConfigKeys.MOONSHOTS_MIN_POOL_SIZE));
jdbcOptions.put("max_idle_time", config().getInteger(ConfigKeys.MOONSHOTS_MAX_IDLE_TIME));
jdbcOptions.put("max_statements", config().getInteger(ConfigKeys.MOONSHOTS_MAX_STATEMENTS));
jdbcOptions.put("max_statements_per_connection", config().getInteger(ConfigKeys.MOONSHOTS_MAX_STATEMENTS_PER_CONNECTION));
jdbcClient = JDBCClient.createShared(vertx, jdbcOptions);
LOG.info(configureMoonshotsDataComplete);
promise.complete();
} catch(Exception ex) {
LOG.error(configureMoonshotsDataFail, ex);
promise.fail(ex);
}
return promise.future();
}
Then you can open a connection to your database, and set the FetchSize in the SQLConnection options:
private void importDataClass(String classSimpleName, ZonedDateTime startDateTime) {
try {
jdbcClient.getConnection(a -> {
if(a.succeeded()) {
SQLConnection sqlConnection = a.result();
sqlConnection.setOptions(new SQLOptions().setFetchSize(config().getInteger(ConfigKeys.MOONSHOTS_FETCH_SIZE)));
importDataCurrikiResource(sqlConnection).onComplete(b -> {
Then you can query all rows from a large table with `SELECT * from` and setup a SQLRowStream object. I recommend calling .pause() before starting to fetch any data, or the fetch will rapidly start streaming and running you out of memory, or CPU or database connections before it can catch up. Then you can fetch a certain amount, and handle each row. I setup my own counter object to count the number of rows fetched, and the number of rows processed and fetch more when the number of rows processed has nearly caught up to the number of rows fetched. I create a separate method like processRowCurrikiResource to process an individual row.
private Future<Void> importDataCurrikiResource(SQLConnection sqlConnection) {
Promise<Void> promise = Promise.promise();
try {
sqlConnection.queryStreamWithParams(
"SELECT * from currikidb.resources"
, new JsonArray(), a -> {
SQLRowStream sqlRowStream = a.result();
Integer fetchSize = config().getInteger(ConfigKeys.MOONSHOTS_FETCH_SIZE);
ApiCounter counter = new ApiCounter();
sqlRowStream.pause();
sqlRowStream.fetch(fetchSize);
sqlRowStream.resultSetClosedHandler(b -> {
sqlRowStream.moreResults();
}).handler(row -> {
counter.incrementQueueNum();
processRowCurrikiResource(row).onSuccess(b -> {
counter.decrementQueueNum();
counter.incrementTotalNum();
if(counter.getQueueNum().compareTo(0L) == 0) {
sqlRowStream.fetch(fetchSize);
}
}).onFailure(ex -> {
LOG.error(importDataCurrikiResourceFail, ex);
promise.fail(ex);
});
}).exceptionHandler(ex -> {
LOG.error(importDataCurrikiResourceComplete, ex);
promise.fail(ex);
}).endHandler(b -> {
LOG.info(importDataCurrikiResourceComplete);
promise.complete();
});
});
} catch(Exception ex) {
LOG.error(importDataCurrikiResourceFail, ex);
promise.fail(ex);
}
return promise.future();
}
You can see more details here where we are working on streaming a brand new MSSQL data source with many rows:
Hope that helps.
Happy New Year!