Future's Listeners Execution Order Bug / Racing

216 views
Skip to first unread message

Goran Macukat

unread,
Apr 28, 2022, 2:53:51 AM4/28/22
to vert.x
Hi All,

We encountered rare problem that happens once in few months in vertx application.
After long investigation and various attempts to reproduce the problem we found it is caused by unexpected order of execution of handlers provided as parameters to multiple calls to onSuccess(handler) on the same future object.

Problem Description

Given this simple code:

public class ProblemReproducer {

    private int value;

    public Future<Void> run(Supplier<Future<Object>> futureSupplier, AtomicInteger success, AtomicInteger problem) {

        Promise<Void> completionPromise = Promise.promise();

        futureSupplier.get()
                .onSuccess(x -> {
                    value = 5555;
                })
                .onSuccess(x -> {
                    if (value == 5555) {
                        success.getAndIncrement(); // This should always happen.
                    } else {
                        problem.getAndIncrement(); // This should never happen.
                    }
                })
                .onComplete(x -> {
                    completionPromise.complete();
                });

        return completionPromise.future();
    }
}


One would say there is no way that "problem.getAndIncrement()" code would ever be executed
because, surely, the "value = 5555" inside the first onSuccess handler lambda
must execute before "if (value == 5555)" inside the second onSuccess handler lambda.

But there are some rare circumstances where "if (value == 5555)" is executed first,
i.e. value is still zero at "if (value == 5555)" and "problem.getAndIncrement()" is executed.

In particular, the following explains these circumstances:

The "problem.getAndIncrement()" executes occasionally,
around once in ten thousand "new ProblemReproducer().run(...)" executions,
if supplied Future<Object> object parameter originates as the return value from the following two methods in Vertx interface:
default <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler)
default <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered)


And this looks like bug.
The order of execution of those two onSuccess handler lambdas should not depend on the source of Future<Object>.
But by looking at Vert.x core code, we don't see how this could possibly happen.
We see that at some point, execution comes to this place in FutureImpl class
where all the handlers (wrapped into listeners) are executed one after another in loop:
  ...
  private static class ListenerArray<T> extends ArrayList<Listener<T>> implements Listener<T> {
    @Override
    public void onSuccess(T value) {
      for (Listener<T> handler : this) {
        handler.onSuccess(value);
      }
    }
  ...



Problem Reproducer

To demonstrate that "problem.getAndIncrement()" executes sometimes, first we need some code
that would execute "new ProblemReproducer().run(...)" many times,
asynchronously awaiting each execution to complete, and report on total number of problems encountered.

Something like this:

public class ProblemReproducerLoop {

    private String tag;
    private int numRuns;
    private Supplier<Future<Object>> futureSupplier;

    private Vertx vertx;
    private AtomicInteger success = new AtomicInteger();
    private AtomicInteger problem = new AtomicInteger();

    public ProblemReproducerLoop(String tag, int numRuns, Supplier<Future<Object>> futureSupplier) {
        this.tag = tag; this.numRuns = numRuns; this.futureSupplier = futureSupplier;
        vertx = Vertx.currentContext().owner();
    }

    public Future<Void> run() {
        Promise<Void> completionPromise = Promise.promise();
        int runIndex = 1;
        run(runIndex, completionPromise);
        return completionPromise.future();
    }

    private void run(int runIndex, Promise<Void> completionPromise) {
        ProblemReproducer problemReproducer = new ProblemReproducer();
        problemReproducer.run(futureSupplier, success, problem)
                .onComplete(x -> {
                    if (runIndex < numRuns) {
                        vertx.runOnContext(v -> {
                            int nextRunIndex = runIndex + 1;
                            run(nextRunIndex, completionPromise);
                        });
                    } else {
                        System.out.println("Future supplier: \"" + tag + "\":" +
                                "  num runs: " + numRuns +
                                "  success: " + success.get() +
                                "  problems: " + problem.get());
                        completionPromise.complete();
                    }
                });
    }
}


Then, we need the code that would execute "new ProblemReproducerLoop(...).run()"
providing various ways of generating this Future<Object> object through futureSupplier paramater,
to examine which one would cause problem and which one would not.

Something like this:

public class ProblemReproducerRunner extends AbstractVerticle {

    @Override
    public void start(Promise<Void> startPromise) throws Exception {
        vertx.runOnContext(v -> runAllCases());
        startPromise.complete();
    }

    private void runAllCases() {
        Future.<Void>succeededFuture()
                .compose(this::run_SucceededFuture_Case)
                .compose(this::run_ExecuteBlocking_Case)
                .compose(this::run_Mapped_ExecuteBlocking_Case)
                .compose(this::run_OrderedFalse_ExecuteBlocking_Case)
                .compose(this::run_Mapped_OrderedFalse_ExecuteBlocking_Case)
                .compose(this::run_After_RunOnContext_Case)
                .compose(this::run_After_SetTimer_Case)
                .onComplete(x -> exit());
    }

    private Future<Void> run_SucceededFuture_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            return Future.succeededFuture("result");
        };
        return new ProblemReproducerLoop("succeededFuture()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_ExecuteBlocking_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            return vertx
                    .executeBlocking(resultPromise -> {
                        resultPromise.complete("result");
                    });
        };
        return new ProblemReproducerLoop("executeBlocking()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_Mapped_ExecuteBlocking_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            return vertx
                    .executeBlocking(resultPromise -> {
                        resultPromise.complete("result");
                    })
                    .map(result -> result);
        };
        return new ProblemReproducerLoop("Mapped executeBlocking()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_OrderedFalse_ExecuteBlocking_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            return vertx
                    .executeBlocking(resultPromise -> {
                        resultPromise.complete("result");
                    }, false);
        };
        return new ProblemReproducerLoop("ordered=false executeBlocking()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_Mapped_OrderedFalse_ExecuteBlocking_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            return vertx
                    .executeBlocking(resultPromise -> {
                        resultPromise.complete("result");
                    }, false)
                    .map(result -> result);
        };
        return new ProblemReproducerLoop("Mapped ordered=false executeBlocking()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_After_RunOnContext_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            Promise<Object> resultPromise = Promise.promise();
            vertx.runOnContext(x -> resultPromise.complete("result"));
            return resultPromise.future();
        };
        return new ProblemReproducerLoop("After runOnContext()", 100_000, futureSupplier).run();
    }

    private Future<Void> run_After_SetTimer_Case(Void v) {
        Supplier<Future<Object>> futureSupplier = () -> {
            Promise<Object> resultPromise = Promise.promise();
            vertx.setTimer(1L, t -> resultPromise.complete("result"));
            return resultPromise.future();
        };
        String tag = "After setTimer(1L)";
        System.out.println("Running for future supplier: \"" + tag + "\" ... (done in 15-20 seconds) ...");
        return new ProblemReproducerLoop(tag, 1000, futureSupplier).run();
    }

    private void exit() {
        vertx.close()
                .onFailure(x -> System.exit(1))
                .onSuccess(x -> System.exit(0));
    }
}


When the above ProblemReproducerRunner verticle is deployed, it will print out around 10-20 problems on hundred thousand executions
for its run_ExecuteBlocking_Case() and run_OrderedFalse_ExecuteBlocking_Case() methods.
And it will print out zero problems for all other provided case methods.

Note that run_Mapped_ExecuteBlocking_Case() and run_Mapped_OrderedFalse_ExecuteBlocking_Case() methods
demonstrate simple workaround by just doing .map(result -> result) on return value from executeBlocking().

All the code is available as gradle project at github:
https://github.com/macgoran/vertx-racing-reproducer
Pull and execute in the project root: gradlew reproducer

The output would be something like:

$ ./gradlew reproducer

> Task :reproducer
Future supplier: "succeededFuture()":  num runs: 100000  success: 100000  problems: 0
Future supplier: "executeBlocking()":  num runs: 100000  success: 99978  problems: 22
Future supplier: "Mapped executeBlocking()":  num runs: 100000  success: 100000  problems: 0
Future supplier: "ordered=false executeBlocking()":  num runs: 100000  success: 99995  problems: 5
Future supplier: "Mapped ordered=false executeBlocking()":  num runs: 100000  success: 100000  problems: 0
Future supplier: "After runOnContext()":  num runs: 100000  success: 100000  problems: 0
Running for future supplier: "After setTimer(1L)" ... (done in 15-20 seconds) ...
Future supplier: "After setTimer(1L)":  num runs: 1000  success: 1000  problems: 0

BUILD SUCCESSFUL in 21s
2 actionable tasks: 1 executed, 1 up-to-date

Thomas SEGISMONT

unread,
Aug 3, 2022, 9:48:00 AM8/3/22
to ve...@googlegroups.com
Hi,

Sorry for this late reply. I filed this issue: https://github.com/eclipse-vertx/vert.x/issues/4446

The problem is that onSuccess, onFailure and onComplete are terminal operations and we cannot guarantee callbacks are executed in order of registration.

There is this PR for Vert.x 4.3.3 that adds documentation about this behavior and introduces a new method: andThen
This method provides the order guarantee at the expense of creating more future objects.

Thanks for putting together this reproducer.

Regards,
Thomas


--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/13950e07-efdc-4139-abb8-fb1f5185a23fn%40googlegroups.com.

Goran Macukat

unread,
Aug 16, 2022, 7:26:00 AM8/16/22
to vert.x
Hi Thomas,

I appreciate the effort to fix this.

But from where in the Vert.x core code this "lack of execution order guarantee" originates?

While I wait for your answer I can explain from where the opposite, the "execution order guarantee" originates:

All onSuccess, onFailure and onComplete are doing is adding their lambda parameter into ListenerArray/ArrayList field in FutureImpl
so that, later, when associated promise completes, ListenerArray just loops through this listener list and executes those lambdas one after another.
I assume I don't need to explain what kind of data structure the List<T> and ArrayList<T> are and what they guarante about their elements order when addedd and when walked through later.

Regards,
Goran

Thomas SEGISMONT

unread,
Aug 17, 2022, 3:59:57 AM8/17/22
to ve...@googlegroups.com
Let's say you have a future that is not completed yet.
You invoke onSuccess which adds a listener to it.
Then you invoke onSuccess again, but in the meantime the future is completed on some other thread.

In this case, the first listener is invoked and the listener reference in the future is cleared:

And while the second listener is "added", the future is seen as completed so the listener is invoked immediately:

It is not impossible that the thread which is invokes the first listener is unscheduled and the other thread continues processing.
This is why you can have the handler added first invoked before the handler added second.

Le mar. 16 août 2022 à 13:26, Goran Macukat <macg...@gmail.com> a écrit :
Hi Thomas,

I appreciate the effort to fix this.

But from where in the Vert.x core code this "lack of execution order guarantee" originates?

While I wait for your answer I can explain from where the opposite, the "execution order guarantee" originates:

All onSuccess, onFailure and onComplete are doing is adding their lambda parameter into ListenerArray/ArrayList field in FutureImpl
so that, later, when associated promise completes, ListenerArray just loops through this listener list and executes those lambdas one after another.
I assume I don't need to explain what kind of data structure the List<T> and ArrayList<T> are and what they guarante about their elements order when addedd and when walked through later.

This last statement wasn't necessary.
 

Julien Viet

unread,
Aug 18, 2022, 2:53:22 AM8/18/22
to vert.x
Hi,

I think the onXXX methods should mention that order is not guaranteed
in their JavaDoc.

Julien
> To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/df97fc1c-c7d3-458f-a584-54896ba482f0n%40googlegroups.com.

Thomas SEGISMONT

unread,
Aug 18, 2022, 4:03:09 AM8/18/22
to ve...@googlegroups.com

Julien Viet

unread,
Aug 18, 2022, 5:35:25 AM8/18/22
to vert.x

Carsten Lohmann

unread,
Aug 20, 2022, 4:30:06 AM8/20/22
to vert.x
Hi,

we have quite a lot of code that uses onSuccess/onFailure and onComplete one after the other (with the order being relevant) on the same Future. 
So, quite a lot of changes would be needed here on our side to adapt all these places.

Let's say you have a future that is not completed yet.
You invoke onSuccess which adds a listener to it.
Then you invoke onSuccess again, but in the meantime the future is completed on some other thread.
 
Isn't this scenario, of the future being completed on some other thread, already kind of special?

I mean, for code that is supposed to be run on the event loop thread, I would usually want to make sure that the Futures being composed in it are also completed on the same event loop thread.
That means that when using "executeBlocking", I wouldn't directly work with Futures completed in the blockingCodeHandler but instead would work with the Future result of "vertx.executeBlocking", which is completed on the original event loop thread.

Wouldn't it be safe to expect onSuccess/onFailure/onComplete to be executed in the order of registration if the invocation of these methods happens on the same event loop thread on which the corresponding Future is completed?

Regards,
Carsten

Goran Macukat

unread,
Aug 20, 2022, 8:44:47 AM8/20/22
to vert.x
Hi Thomas and Julien,

When I encountered the problem, and after I figured out it originates from "... Future<T> executeBlocking( ...",
and knowing that executeBlocking uses worker thread pool to execute submitted lambda,
I knew immediately that multi-threaded concurrency is involved somewhere to cause the problem.
But couldn't find anything wrong in executeBlocking code.
So I filled in the bug report and created the reproducer because I regard this as a bug.

Now Thomas correctly pointed me to the code in FutureImpl that causes the problems WHEN "the future is completed on some other thread"

But concerns about something executing from another thread are from domain of multi-threaded concurrency.
And Vert.x provides concurrency with event loop and non blocking asynchronous API, not by executing the code in multiple threads.
Sharing and DIRECTLY using objects between threads is major "NO! NO!" for any loop event based framework.
They all advice to do so INDIRECTLY using some equivalent of "please run this on the event loop when you can" method, such as runOnContext().

I'm bit surprised Vert.x core team didn't see this problem as a bug but went to "the order is not guaranteed" direction.

So now that Thomas explained exactly where is the problem, I figured out exactly what is wrong with executeBlocking.
It is in its line: return fut;
It should have been: return fut.map(result -> result);
Why?
Because fut's promise is the object that is passed into blockingCodeHandler
and this object is used in blockingCodeHandler code to either complete() or fail() the promise
but this completion happens in separate worker pool thread away from event loop thread
and by returning this object via "return fut;" to the caller
we are leaking the object that is shared between the threads into caller code
causing the effects not expected for the code that deal with "event loop concurrency"
So simple return fut.map(result -> result);
solves the problem because map() creates new promise and return its future to the caller code.

I'd like you not to go to direction of further confusing users with "the order is not guaranteed" because "some other thread might be involved".
Actually, currently, the JavaDoc is amended to mention "the order is not guaranteed" but without explanation.
If you were to add explanation and explanation would be "because the future could be completed on some other thread" it would be even more confusing.
And all this looks like stepping outside of Vert.x domain to me.
You even added new Future method andThen() method to provide order guarantee in that commit where JavaDoc is changed with with "the order is not guaranteed".
But andThen() looks just like fancy map().
And that's why .map(result -> result) also works.

I'm not happy where Vert.x is going regarding this, but you are the project leads and it is up to you ultimately.

I will continue to "assume" the order of execution follows the order of registration.
I already applied the fix with map(). Just I created separate executeBlocking() utility method where this map(result -> result) is applied
and use this utility method everywhere I need executeBlocking().

    public static <Result> Future<Result> executeBlocking(Vertx vertx, Handler<Promise<Result>> blockingCodeHandler) {
        return vertx.<Result>executeBlocking(blockingCodeHandler).map(result -> result);

Julien Viet

unread,
Aug 21, 2022, 5:02:26 PM8/21/22
to vert.x
Hi,

I think we have a race in the scenario you mentioned when a future is
completed outside of the event-loop (like a promise completed in
executeBlocking) because the implementation does

1/ change the state of the future
2/ run a task on the event-loop to notify the listeners

it can race against an event-loop thread that will add a listener:

1/ event-loop call execute blocking and add several listeners
2/ execute blocking thread run the task and complete the future which
update the state and schedule a event-loop task
3/ the event-loop continue to add listeners but see the completed
state of the future and immediately call the listeners
4/ the notify task is executed and run the initial listeners

we would need to modify the implementation of the current future to
record the fact that a notify task is in progress and the listeners
should be kept added to the listener list to have this work properly.

Julien
> --
> You received this message because you are subscribed to the Google Groups "vert.x" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/6bc796a9-d387-4c1f-8351-f1b460554037n%40googlegroups.com.

Julien Viet

unread,
Aug 21, 2022, 5:11:45 PM8/21/22
to vert.x
I think that what solves the issue in your case with map is the fact that

1/ the execute blocking promise is racy
2/ using map creates a new future that will be completed on the
event-loop because it is a listener of the initial racy with
onListener / complete not execute on same thread. Here by racy I mean
relying on the onXXX add order
3/ this new future is not racy wrt to the other future because it is
completed on the event-loop and the onXXX methods will be executed on
the same thread

as I said in another email I think we could improve the implementation
of the current future to record the intermediate being completed
state, so instead of having complete to be:

1/ update state to completed and swap listeners array to be
2/ run a task to notify listeners

it could be

1/ update state to be completed
2/ run the task
3/ the task swap the listeners array

in this case onXXX order would be work in the case you mention, I think.

Julien
> --
> You received this message because you are subscribed to the Google Groups "vert.x" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
> To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/9ab08a2b-4bae-4f9d-84c3-46927a92c977n%40googlegroups.com.

Julien Viet

unread,
Aug 21, 2022, 5:42:56 PM8/21/22
to vert.x
here is a PR that adds a test that fails with the current future implementation:

it seems that this test describes the behaviour you are running into :
https://github.com/eclipse-vertx/vert.x/pull/4462/files#diff-0cde568dc660ca6a7eb4a3fcbbfdbd7d7f9a17b5da949f51f830521be0f7f664R1018

and then improves the future to work when used from the event-loop:

https://github.com/eclipse-vertx/vert.x/pull/4462
Reply all
Reply to author
Forward
0 new messages