Thanks for the great idea Julien!
I switched to using eventBus.request()/reply(), along with a simple Long counter, and setting up the SQL Row Stream to pause, and then fetch a certain amount. I was able to remove the blocking semaphores and double my performance with your recommendation!
private Future<Void> syncData(String tableName) {
Promise<Void> promise = Promise.promise();
try {
if(config().getBoolean(String.format("%s%s", ConfigKeys.ENABLE_DB_SOLR_SYNC, tableName), true)) {
LOG.info(String.format(syncDataStarted, tableName));
Long apiCounterPause = config().getLong(ConfigKeys.API_COUNTER_PAUSE);
Long apiCounterResume = config().getLong(ConfigKeys.API_COUNTER_RESUME);
Integer apiCounterFetch = config().getInteger(ConfigKeys.API_COUNTER_FETCH);
vertx.sharedData().getCounter("dbSolrSync").onSuccess(counter -> {
counter.get().onSuccess(startCount -> {
counter.compareAndSet(startCount, 0L).onSuccess(counterResetSuccess -> {
if(counterResetSuccess) {
pgPool.withTransaction(sqlConnection -> {
Promise<Void> promise1 = Promise.promise();
sqlConnection.prepare(String.format("SELECT pk FROM %s", tableName)).onSuccess(preparedStatement -> {
setCountNum(0L);
setTotalNum(0L);
try {
RowStream<Row> stream = preparedStatement.createStream(apiCounterFetch);
stream.pause();
stream.fetch(apiCounterFetch);
stream.exceptionHandler(ex -> {
LOG.error(String.format(syncDataFail, tableName), new RuntimeException(ex));
promise1.fail(ex);
});
stream.endHandler(v -> {
LOG.info(String.format(syncDataComplete, tableName));
promise1.complete();
});
stream.handler(row -> {
incrementCountNum();
if(countNum.compareTo(apiCounterPause) >= 0) {
stream.pause();
}
try {
vertx.eventBus().request(
String.format("opendatapolicing-enUS-%s", tableName)
, new JsonObject().put(
"context"
, new JsonObject().put(
"params"
, new JsonObject()
.put("body", new JsonObject().put("pk", row.getLong(0).toString()))
.put("path", new JsonObject())
.put("cookie", new JsonObject())
.put("query", new JsonObject().put("q", "*:*").put("fq", new JsonArray().add("pk:" + row.getLong(0))).put("var", new JsonArray().add("refresh:false")))
)
)
, new DeliveryOptions().addHeader("action", String.format("patch%sFuture", tableName))).onSuccess(a -> {
incrementTotalNum();
decrementCountNum();
if(countNum.compareTo(apiCounterResume) == 0) {
stream.fetch(apiCounterFetch);
LOG.info("FETCH {} {}", totalNum, Thread.currentThread().getName());
}
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise1.fail(ex);
});
} catch (Exception ex) {
LOG.error(String.format(syncDataFail, tableName), ex);
promise1.fail(ex);
}
});
} catch (Exception ex) {
LOG.error(String.format(syncDataFail, tableName), ex);
promise1.fail(ex);
}
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise1.fail(ex);
});
return promise1.future();
}).onSuccess(a -> {
promise.complete();
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise.fail(ex);
});
} else {
Exception ex = new RuntimeException(String.format(syncDataCounterResetFail, tableName));
LOG.error(ex.getMessage(), ex);
promise.fail(ex);
}
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise.fail(ex);
});
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise.fail(ex);
});
}).onFailure(ex -> {
LOG.error(String.format(syncDataFail, tableName), ex);
promise.fail(ex);
});
} else {
LOG.info(String.format(syncDataSkip, tableName));
promise.complete();
}
} catch (Exception ex) {
LOG.error(String.format(syncDataFail, tableName), ex);
promise.fail(ex);
}
return promise.future();
}