Best way to process a List one by one

133 views
Skip to first unread message

justinl...@gmail.com

unread,
Aug 17, 2017, 6:39:44 AM8/17/17
to vert.x
Hi,

i have search a way to process a list of element one by one (each process use async ), without blocking the main thread.

example :

List<String> evt = new ArrayList<String>();

evt.add("evt.step1");
evt.add("evt.step2");
evt.add("evt.step3");

//
vertx.eventBus().send(evt.get(0), "test", ar -> {
    vertx.eventBus().send(evt.get(1), "test", ar -> {
        vertx.eventBus().send(evt.get(2), "test", ar -> {
})
})
});


I have found vertx-async (https://github.com/simondean/vertx-async), but i can't use a list.
So i have done a little implementation to do that but i have the impression to reinvent the wheel :)

Maybe a simple solution exist with Future ?

whith my class we can do that :


List<String> evt = new ArrayList<String>();

evt
.add("evt.step1");
evt
.add("evt.step2");
evt
.add("evt.step3");

Async.create(evt)
.series((event, next) -> {
    vertx
.eventBus().send(event, "test", ar -> {
 
       next.handle(null);
    });

 
},
 
(error) -> {
 
    System.out.println("ending process list");
 
}
);


it uses the same syntax that the populat Async lib in nodejs.

justinl...@gmail.com

unread,
Aug 17, 2017, 6:41:10 AM8/17/17
to vert.x

the Async class
Async.java

Julien Viet

unread,
Aug 18, 2017, 8:16:01 AM8/18/17
to ve...@googlegroups.com
Hi,

your use case seems to be a tad specific to be provided out of the box by a library.

Julien

> On Aug 17, 2017, at 12:41 PM, justinl...@gmail.com wrote:
>
> <Async.java>

Jez P

unread,
Aug 18, 2017, 9:12:56 AM8/18/17
to vert.x
Personally I think I'd go a different way on this and build up a list of Supplier<CompletableFuture>, then use the streaming API to call reduce, taking advantage of the CompletableFuture's ability to call thenAccept/thenApply/thenCompose to act on the result of the future. I've had to do similar in the pac4j-async library I'm working on when we want to ensure that certain async operations are processed in serial rather than parallel.

justinl...@gmail.com

unread,
Aug 18, 2017, 10:34:35 AM8/18/17
to vert.x
Hi Julien,



your use case seems to be a tad specific to be provided out of the box by a library.



i don't think so, for me this method provide a generic way to process a list in serial, beacuse of you can provide any method to process any type of element.

justinl...@gmail.com

unread,
Aug 18, 2017, 10:48:50 AM8/18/17
to vert.x

Personally I think I'd go a different way on this and build up a list of Supplier<CompletableFuture>, then use the streaming API to call reduce, taking advantage of the CompletableFuture's ability to call thenAccept/thenApply/thenCompose to act on the result of the future. I've had to do similar in the pac4j-async library I'm working on when we want to ensure that certain async operations are processed in serial rather than parallel.


Thanks, could you show me how to convert my little example with Supplier<CompletableFuture> please ? i don't see how to do it :(

List<String> evt = new ArrayList<String>();

evt
.add("evt.step1");
evt
.add("evt.step2");
evt
.add("evt.step3");

//
vertx
.eventBus().send(evt.get(0), "test", ar -> {
 vertx
.eventBus().send(evt.get(1), "test", ar -> {
 vertx
.eventBus().send(evt.get(2), "test", ar -> {
 
})
 
})
});


 
On Friday, August 18, 2017 at 1:16:01 PM UTC+1, Julien Viet wrote:

Jez P

unread,
Aug 19, 2017, 10:17:21 AM8/19/17
to vert.x
See allInSequence in https://github.com/millross/pac4j-async/blob/master/pac4j-async-core/src/main/java/org/pac4j/async/core/future/FutureUtils.java

Note that in your case you get a Supplier<CompletableFuture<Void>>
by creating a method such as (not tested in IDE, but you should get the idea)
Supplier<CompletableFuture<Void>> sendAndReply(final String address) {
    return () -> {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        vertx.eventBus().send(address, "test, ar -> {
            if (ar.successful()) {
                future.complete(null);
            } else {
                future.completeExceptionally(ar.cause);
            }
        });
        return future;
    }
}

Note that it's pretty easy to turn that method into a general pattern to complete a completablefuture from an async result, and easily wrap sends where you do some additional processing prior to completing the future, and then you can use the reduce approach as shown in my links. 
Message has been deleted

justinl...@gmail.com

unread,
Aug 19, 2017, 5:25:08 PM8/19/17
to vert.x

See allInSequence in https://github.com/millross/pac4j-async/blob/master/pac4j-async-core/src/main/java/org/pac4j/async/core/future/FutureUtils.java

Note that in your case you get a Supplier<CompletableFuture<Void>>
by creating a method such as (not tested in IDE, but you should get the idea)
Supplier<CompletableFuture<Void>> sendAndReply(final String address) {
    return () -> {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        vertx.eventBus().send(address, "test, ar -> {
            if (ar.successful()) {
                future.complete(null);
            } else {
                future.completeExceptionally(ar.cause);
            }
        });
        return future;
    }
}

Note that it's pretty easy to turn that method into a general pattern to complete a completablefuture from an async result, and easily wrap sends where you do some additional processing prior to completing the future, and then you can use the reduce approach as shown in my links. 



Thank you very much, it's exactly what i needed to understand !

Jez P

unread,
Aug 19, 2017, 5:52:56 PM8/19/17
to vert.x
Don't know if vertx futures follow the same composition syntax - my work is using CompletableFutures because the API I work on isn't vertx-only, so I want to express the API as Java CompletableFutures. Effectively though you can apply the same pattern to any kind of asynchronous computation, not just to eventbus send/reply. 
Reply all
Reply to author
Forward
0 new messages