Combining SQL results from multiple databases queries

327 views
Skip to first unread message

Jody Schering

unread,
Mar 2, 2016, 6:42:59 PM3/2/16
to vert.x
Hi everyone, 

I need to run the same query against multiple databases and combine the results. I've read up on using JavaRx and using Observables to merge multiple observables into one observable. After some experimentation I was able to get this working as shown below. Unfortunately I need to close the connection so its released back to the pool, but it's out of scope in my onComplete and onError methods in my observable and so I can't call close() on it. Does anyone have any suggestions on how to work around this or restructure my code so I can close the connection once all the queries have completed?

private void executeObservableQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) {
JsonArray rows = new JsonArray();
ObservableFuture<ResultSet>[] observables = new ObservableFuture[clients.size()];
int i=0;
for (JDBCClient client : clients) {
ObservableFuture<ResultSet> resultSetObservable = RxHelper.observableFuture();
observables[i] = resultSetObservable;
i++;
client.getConnection(conn -> {
if (conn.succeeded()) {
SQLConnection connection = conn.result();
//This finallyDo is never called...
resultSetObservable.finallyDo(()->{
System.out.println("Closing the connection");
connection.close();
});
connection.queryWithParams(siteQuery, params, resultSetObservable.toHandler());
}
else {
routingContext.response().setStatusCode(500).end(conn.cause().getMessage());
}
});
}
Observable<ResultSet> results = Observable.merge(observables);
results.subscribe(resultSet ->{//onNext(ResultSet resultSet)
// Get the result set
for (JsonObject row : resultSet.getRows()) {
rows.add(row);
}
},
exception ->{ //onError(exception e)
routingContext.response().setStatusCode(500).end(exception.getMessage());
// connection.close(); //connection is out of scope.
},
() ->{//onComplete()
routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily());
// connection.close();//connection is out of scope.
});
}

Thanks,
Jody

Julien Viet

unread,
Mar 3, 2016, 2:08:22 AM3/3/16
to ve...@googlegroups.com
you could try to use

resultSetObservable.doOnCompleted(conn::close);

but for this you need to do that in the client.getConnection() lambda block.

besides if you are using 3.2.1 I would suggest you to look also at io.vertx.core.CompositeFuture, it will help to solve this but in the same manner.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/41b0ad7a-6f83-4203-85d1-415415301e41%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jody Schering

unread,
Mar 3, 2016, 12:29:21 PM3/3/16
to vert.x
Hi Julien, thanks for the suggestions.

As far as I can tell using resultSetObservable.doOnCompleted(conn::close); has some challenges. Using doOnCompleted returns a new observable and therefore is not in the list I created earlier and since I have to wait to call it until I have created a connection in the asyncResult handler of the client.getConnection. At that point the code may have already merged the observables and called subscribe.

As for using CompositeFuture and Futures, I don't understand how I use this in conjunction with the connection.queryWithParams() method to achieve the result I'm looking for. Can you take the method I have above and restructure it using Futures? or provide a similar example on how I can achieve this?


Jody Schering

unread,
Mar 3, 2016, 1:53:24 PM3/3/16
to vert.x
Ok, I believe I have it solved. I can do it 2 ways 1 using JavaRx the other using CompositeFutures. Below are the 2 options incase anyone else has this issue or if anyone has a better solution.

Using JavaRx
private void executeObservableQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) {
JsonArray rows = new JsonArray();
ObservableFuture<ResultSet>[] observables = new ObservableFuture[clients.size()];
List<SQLConnection> connections = new ArrayList<SQLConnection>();
int i=0;
for (JDBCClient client : clients) {
ObservableFuture<ResultSet> resultSetObservable = RxHelper.observableFuture();
observables[i] = resultSetObservable;
i++;
client.getConnection(conn -> {
if (conn.succeeded()) {
SQLConnection connection = conn.result();
connections.add(connection); //save the connection to close later.
connection.queryWithParams(queryString, params, resultSetObservable.toHandler());
}
else {
routingContext.response().setStatusCode(500).end(conn.cause().getMessage());
}
});
}
Observable<ResultSet> results = Observable.merge(observables);
results.subscribe(resultSet ->{//onNext(ResultSet resultSet)
// Get the result set
for (JsonObject row : resultSet.getRows()) {
rows.add(row);
}
},
exception ->{ //onError(exception e)
routingContext.response().setStatusCode(500).end(exception.getMessage());
//close the connection 
for(SQLConnection c : connections){
c.close();
}
},
() ->{//onComplete()
routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily());
//close the connection 
for(SQLConnection c : connections){
c.close();
}
});
}



Using ComposteFuture
private void executeFutureQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) {
List<Future> futureResultSets = new ArrayList<Future>();
List<SQLConnection> connections = new ArrayList<SQLConnection>();
int i=0;
for (JDBCClient client : clients) {
Future<ResultSet> futureResultSet = Future.future();
futureResultSets.add(futureResultSet);
client.getConnection(conn -> {
if (conn.succeeded()) {
SQLConnection connection = conn.result();
connections.add(connection);
connection.queryWithParams(queryString, params,futureResultSet.completer());
}
else {
routingContext.response().setStatusCode(500).end(conn.cause().getMessage());
}
});
i++;
}
CompositeFuture.all(futureResultSets).setHandler(ar -> {
if (ar.succeeded()) {
JsonArray rows = new JsonArray();
for(int index = 0; index < ar.result().size(); index++){
ResultSet resultSet = (ResultSet) ar.result().result(index);
for (JsonObject row : resultSet.getRows()) {
rows.add(row);
}
}
routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily());

}
else {
routingContext.response().setStatusCode(500).end(ar.cause().getMessage());

}
for(SQLConnection c : connections){
c.close();
}
});
}


Reply all
Reply to author
Forward
0 new messages