Vert.x Sequential Function Composition Never Completes

706 views
Skip to first unread message

plom...@datawire.io

unread,
Oct 6, 2016, 9:58:31 AM10/6/16
to vert.x
Hi... I have some code that uses the function composition mechanism in Vert.x futures but it never seems to complete and just leaves my incoming request hanging indefinitely. The handler is implemented as such:

package io.datawire.mcp.api;

import io.vertx.core.Future;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.core.shareddata.Counter;
import io.vertx.core.shareddata.SharedData;
import io.vertx.ext.web.RoutingContext;
import org.jetbrains.annotations.NotNull;

public class GetStatisticsJava extends APIHandler {

  @Override
  protected void handle(@NotNull RoutingContext ctx, @NotNull HttpServerRequest request, @NotNull HttpServerResponse response) {
    SharedData sharedData = ctx.vertx().sharedData();

    Future<JsonObject> queries = Future.future();
    queries.setHandler( res -> {
      if (res.succeeded()) {
        getLogger().trace("SUCCEEDED");
        response.setStatusCode(200)
            .setStatusMessage("OK")
            .putHeader(HttpHeaders.CONTENT_TYPE, "application/json; charset=utf-8")
            .end(res.result().encodePrettily());
      } else {
        getLogger().trace("FAILED");
        ctx.fail(res.cause());
      }
    });

    Future<Counter> loadCounter = Future.future();
    sharedData.getCounter("mcp.rtp.connections", loadCounter.completer());

    loadCounter.compose( counter -> {
      getLogger().trace("Retrieved RTP counter (name: mcp.rtp.connections)");
      Future<Long> count = Future.future();
      counter.get(count.completer());
      return count;
    }).compose( count -> {
      getLogger().trace("Retrieved RTP connection count (value: {})", count);
      Future<JsonObject> res = Future.future();
      res.complete(new JsonObject().put("mcp.rtp.connections", count));
    }, queries);
  }
}

My logs indicate that the functions get sequentially executed but the code in the initial queries Future handler is never invoked:

2016-10-06 13:57:11 TRACE [vert.x-eventloop-thread-2] i.d.m.a.GetStatisticsJava - Retrieved RTP counter (name: mcp.rtp.connections)
2016-10-06 13:57:11 TRACE [vert.x-eventloop-thread-2] i.d.m.a.GetStatisticsJava - Retrieved RTP connection count (value: 0)

Anyone have any ideas what's going on here? 

jklingsporn

unread,
Oct 6, 2016, 10:20:21 AM10/6/16
to vert.x
According to the Future.compose-Javadoc, your count -> {} handler should complete queries instead of res:

Code hier eingeben...package io.datawire.mcp.api;
      queries.complete(new JsonObject().put("mcp.rtp.connections", count));
    }, queries);
  }
}

plom...@datawire.io

unread,
Oct 6, 2016, 11:01:11 AM10/6/16
to vert.x
Hmm.... that's interesting. The Java manual doesn't show that being the case at all...

FileSystem fs = vertx.fileSystem();
Future<Void> startFuture = Future.future();

Future<Void> fut1 = Future.future();
fs.createFile(
"/foo", fut1.completer());

fut1.compose(v -> {
 
// When the file is created (fut1), execute this:
  Future<Void> fut2 = Future.future();
  fs.writeFile(
"/foo", Buffer.buffer(), fut2.completer());
 
return fut2;
}).compose(v -> {
         
// When the file is written (fut2), execute this:
          Future<Void> fut3 = Future.future();
          fs.move(
"/foo", "/bar", fut3.completer());
        },
       
// mark the start future as completed when all the chain has been completed,
       
// or mark it as failed if any step fails.
        startFuture);

jklingsporn

unread,
Oct 7, 2016, 5:46:45 AM10/7/16
to vert.x
That is strange indeed. Here is a copy from the sources:

default <U> Future<U> compose(Handler<T> handler, Future<U> next) {
  setHandler
(ar -> {
   
if (ar.succeeded()) {
     
try {
        handler
.handle(ar.result());
     
} catch (Throwable err) {
       
if (next.isComplete()) {
         
throw err;
       
}
       
next.fail(err);
     
}
   
} else {
     
next.fail(ar.cause());
   
}
 
});
 
return next;
}

When the handler fails or throws an error, next fails. In this implementation, next never gets completed. Not sure what's wrong though - the implementation or the documentation :)
Also note, that there are two different compose-methods. 

Julien Viet

unread,
Oct 7, 2016, 9:04:29 AM10/7/16
to ve...@googlegroups.com
Hi,

the handler should complete the next Future as it has no way to get a valid U for succeeding the next.

it is written in the javadoc.

Julien

-- 
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/57112d4c-093f-4dfa-a335-58703bac3f58%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Deven Phillips

unread,
Dec 20, 2016, 1:47:24 PM12/20/16
to vert.x
Glad I found this! This is the exact problem I was having... This is incorrectly documented in the Vert.x manual...

The example:

import io.vertx.groovy.core.Future
import io.vertx.groovy.core.buffer.Buffer


def fs = vertx.fileSystem()
def startFuture = Future.future()


def fut1 = Future.future()
fs
.createFile("/foo", fut1.completer())


fut1
.compose({ v ->

 
// When the file is created (fut1), execute this:

 
def fut2 = Future.future()

  fs
.writeFile("/foo", Buffer.buffer(), fut2.completer())
 
return fut2
}).compose({ v ->

 
// When the file is written (fut2), execute this:

 
def fut3 = Future.future()

  fs
.move("/foo", "/bar", fut3.completer())
}, startFuture)

 Should be:

import io.vertx.groovy.core.Future
import io.vertx.groovy.core.buffer.Buffer


def fs = vertx.fileSystem()
def startFuture = Future.future()


def fut1 = Future.future()
fs
.createFile("/foo", fut1.completer())


fut1
.compose({ v ->

 
// When the file is created (fut1), execute this:

 
def fut2 = Future.future()

  fs
.writeFile("/foo", Buffer.buffer(), fut2.completer())
 
return fut2
}).compose({ v ->

 
// When the file is written (fut2), execute this:

  fs
.move("/foo", "/bar", startFuture.completer())
}, startFuture)

Deven

Thomas SEGISMONT

unread,
Dec 21, 2016, 12:14:05 PM12/21/16
to ve...@googlegroups.com
Would you mind to send a pull request for this? The source is here:

But make sure your code is right first. I believe the problem in the first version is that it is missing "return future3".

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

Deven Phillips

unread,
Dec 22, 2016, 1:11:14 PM12/22/16
to ve...@googlegroups.com
Thomas,

    Kinda swamped with everything else and the holidays. I may be able to get to a PR something in the middle of January. I can confirm that the last item in the chain needs to resolve via the initial future defined before the chain starts. I tried it using the syntax you suggested and the problem persisted.

Deven

--
You received this message because you are subscribed to a topic in the Google Groups "vert.x" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/vertx/5yoHmB0shOs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to vertx+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages