private void executeObservableQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) { JsonArray rows = new JsonArray(); ObservableFuture<ResultSet>[] observables = new ObservableFuture[clients.size()]; int i=0; for (JDBCClient client : clients) { ObservableFuture<ResultSet> resultSetObservable = RxHelper.observableFuture(); observables[i] = resultSetObservable; i++; client.getConnection(conn -> { if (conn.succeeded()) { SQLConnection connection = conn.result(); //This finallyDo is never called... resultSetObservable.finallyDo(()->{ System.out.println("Closing the connection"); connection.close(); }); connection.queryWithParams(siteQuery, params, resultSetObservable.toHandler()); } else { routingContext.response().setStatusCode(500).end(conn.cause().getMessage()); } }); } Observable<ResultSet> results = Observable.merge(observables); results.subscribe(resultSet ->{//onNext(ResultSet resultSet) // Get the result set for (JsonObject row : resultSet.getRows()) { rows.add(row); } }, exception ->{ //onError(exception e) routingContext.response().setStatusCode(500).end(exception.getMessage());// connection.close(); //connection is out of scope. }, () ->{//onComplete() routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily());// connection.close();//connection is out of scope. }); }--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/41b0ad7a-6f83-4203-85d1-415415301e41%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
private void executeObservableQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) { JsonArray rows = new JsonArray(); ObservableFuture<ResultSet>[] observables = new ObservableFuture[clients.size()]; List<SQLConnection> connections = new ArrayList<SQLConnection>(); int i=0; for (JDBCClient client : clients) { ObservableFuture<ResultSet> resultSetObservable = RxHelper.observableFuture(); observables[i] = resultSetObservable; i++; client.getConnection(conn -> { if (conn.succeeded()) { SQLConnection connection = conn.result(); connections.add(connection); //save the connection to close later. connection.queryWithParams(queryString, params, resultSetObservable.toHandler()); } else { routingContext.response().setStatusCode(500).end(conn.cause().getMessage()); } }); } Observable<ResultSet> results = Observable.merge(observables); results.subscribe(resultSet ->{//onNext(ResultSet resultSet) // Get the result set for (JsonObject row : resultSet.getRows()) { rows.add(row); } }, exception ->{ //onError(exception e) routingContext.response().setStatusCode(500).end(exception.getMessage()); //close the connection for(SQLConnection c : connections){ c.close(); } }, () ->{//onComplete() routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily()); //close the connection for(SQLConnection c : connections){ c.close(); } }); } private void executeFutureQuery(RoutingContext routingContext, String queryString, JsonArray params,String contentType, Set<JDBCClient> clients) { List<Future> futureResultSets = new ArrayList<Future>(); List<SQLConnection> connections = new ArrayList<SQLConnection>(); int i=0; for (JDBCClient client : clients) { Future<ResultSet> futureResultSet = Future.future(); futureResultSets.add(futureResultSet); client.getConnection(conn -> { if (conn.succeeded()) { SQLConnection connection = conn.result(); connections.add(connection); connection.queryWithParams(queryString, params,futureResultSet.completer()); } else { routingContext.response().setStatusCode(500).end(conn.cause().getMessage()); } }); i++; } CompositeFuture.all(futureResultSets).setHandler(ar -> { if (ar.succeeded()) { JsonArray rows = new JsonArray(); for(int index = 0; index < ar.result().size(); index++){ ResultSet resultSet = (ResultSet) ar.result().result(index); for (JsonObject row : resultSet.getRows()) { rows.add(row); } } routingContext.response().putHeader("content-type", contentType).setStatusCode(200).end(rows.encodePrettily());
} else { routingContext.response().setStatusCode(500).end(ar.cause().getMessage());
} for(SQLConnection c : connections){ c.close(); } }); }