Composite future not completing before the component futures can complete

514 views
Skip to first unread message

Hemanth Shekar

unread,
May 28, 2019, 10:37:55 PM5/28/19
to ve...@googlegroups.com
Hi Team , 

We have a urgent requirement where we get multiple files in a folder .
We need to load the files to a database .

The design was that the vertx event loop will make a execute blocking call to pick each file and load it into database . 
I am using execute blocking to create multiple worker verticles and using composite futures to check if all of them succeed . 

The Log shows that all the files are getting picked up but before all the loads can complete the vertx.close() ( last line of the program executes ) and the program fails . 

what am I doing wrong . I was expecting the compositeFuture.join to wait till all the execute blocking calls complete . 

This is a urgent requirement . Any help would be greatly appreciated.

Below are the logs  and below that is the code .
Logs -
000000_2
Doing the job...
000000_1
Doing the job...
file_0
Doing the job...
000000_0
Doing the job...
000000_3
Doing the job...
000000_4
19/05/28 19:15:17 INFO csrtrmld.Loadmodule: inside try of loadOneFile :
Doing the job...
19/05/28 19:15:17 INFO csrtrmld.Loadmodule: Future size : 6
19/05/28 19:15:17 INFO csrtrmld.MainVerticle: END OF THE FUNCTION CALLING VERTX CLOSE
19/05/28 19:15:17 INFO csrtrmld.MainVerticle: Failed

Code-
I am trying to use composite future to 
I am attaching the code as i cannot give the repository link here . 

while(t.hasNext()) {
Path p = t.next().getPath();
System.out.println(p.getName());

FutureexecuteBackendWorker.add(executeBackendWorker(bucket, vertx, Seperator, LogDir, Interval, fs, p)) ;  

}
LOGGER.info("Future size : "+ FutureexecuteBackendWorker.size() ) ;
CompositeFuture Compositefuture = CompositeFuture.join(FutureexecuteBackendWorker) ;

return Compositefuture ;

private Future<String> executeBackendWorker(Bucket bucket, Vertx vertx, String Seperator, String LogDir, int Interval,
FileSystem fs, Path p) {
   System.out.println("Doing the job...");
   Future<String> future = Future.future();
vertx.executeBlocking(call -> {

try {

loadOneFile(bucket, Seperator, LogDir, Interval, fs, p);
 
  } catch (Exception ex) {

  MainVerticle.exitStatus = 1 ;
  future.fail("failed");
 
  }
future.complete("success");
 
  },   res -> {
 
  LOGGER.info("Loading a file completed" + p);  
  });

return  future;
}


private void loadOneFile(Bucket bucket, String Seperator, String LogDir, int Interval, FileSystem fs, Path p)
throws IOException {

FSDataInputStream inputStream= fs.open(p);
Scanner sc = new Scanner(new InputStreamReader(inputStream));
try {
LOGGER.info("inside try of loadOneFile :") ;
String s;
while (sc.hasNextLine()) {
int i = 0 ;
List<JsonDocument> documents = new ArrayList<JsonDocument>();

while (i < Interval &&  sc.hasNextLine( ) )
{
String SingleDocLine ="" ;
String Key = "";
try {
   SingleDocLine = sc.nextLine();
   String[] split = SingleDocLine.split(Seperator);
   String value = split[1];
   Key = split[0];
JsonTranscoder trans = new JsonTranscoder();
JsonObject jobj = trans.stringToJsonObject(value);  
documents.add(JsonDocument.create(Key, jobj)) ;
} catch (Exception E) {

LOGGER.error("ERROR PROCESSING KEY : " + Key);
appendStrToFile(LogDir+"/"+"ERROR"+modifiedDate ,SingleDocLine+"\n") ;
MainVerticle.exitStatus = 1 ;
continue;

}
   i = i+1 ;
}
Observable
   .from(documents)
   .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
       @Override
       public Observable<JsonDocument> call(final JsonDocument document) {
           return bucket.async().upsert(document)
            .retryWhen(RetryBuilder
                           .anyOf(BackpressureException.class)
                           .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                           .max(10)
                           .build())
           
            ;
       }
   })
   .last()
   .toBlocking()
   .single();

countInt = countInt + i ;
LOGGER.info("PROCESSED RECORD COUNT : " + countInt);


}
if (sc.ioException() != null) {

LOGGER.error("ONE OF THE THREAD FAILED WHILE LOADING");
MainVerticle.exitStatus = 1 ;
   throw sc.ioException();
   
}
} finally {
   if (inputStream != null) {
       inputStream.close();
   }
   if (sc != null) {
       sc.close();
   }
 
}

}


LoadModule.txt
mainVerticle.txt

Hemanth Shekar

unread,
May 29, 2019, 12:16:41 AM5/29/19
to ve...@googlegroups.com
Hi team  , 

I tried changing from composite future to normal Future implementation in a loop . 
It still fails : 

code Changes : 
List<Future<String>> FutureexecuteBackendWorker = new ArrayList<>();
while(t.hasNext()) {
Path p = t.next().getPath();
System.out.println(p.getName());

FutureexecuteBackendWorker.add(executeBackendWorker(bucket, vertx, Seperator, LogDir, Interval, fs, p)) ;  

}
LOGGER.info("Future size : "+ FutureexecuteBackendWorker.size() ) ;

for (Future<String> futureObj : FutureexecuteBackendWorker) {
futureObj.setHandler(futureHandler -> {

if (futureHandler.succeeded()) {

LOGGER.info("completed loading " + futureHandler.result()) ;

}else
{

LOGGER.info("Failed loading " + futureHandler.cause()) ;
}

});


}

Log output : 

000000_2
Doing the job...
000000_1
Doing the job...
file_0
Doing the job...
000000_0
Doing the job...
000000_3
Doing the job...
000000_4
Doing the job...
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Future size : 6
19/05/28 21:07:49 INFO csrtrmld.MainVerticle: END OF THE FUNCTION CALLING VERTX CLOSE
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: inside try of loadOneFile :
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/000000_2
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/000000_1
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/file_0
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/000000_0
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/000000_3
19/05/28 21:07:49 INFO csrtrmld.Loadmodule: Failed loading io.vertx.core.impl.NoStackTraceThrowable: failed :maprfs:///user/data/testDir/000000_4

Jez P

unread,
May 29, 2019, 1:59:41 AM5/29/19
to vert.x
As far as I can see your code isn’t waiting for the composite future to complete. Maybe I’m misreading, but it looks to me as though you set the composite future handler, and then call callexitstatus. So you start things off, set the handler, and then kill your vertx instance. This is exactly consistent with the logging you’re seeing.

Hemanth Shekar

unread,
May 29, 2019, 4:31:09 AM5/29/19
to ve...@googlegroups.com
Hi Jez

How do I wait for composite future to complete. 
Any pointers in this will be helpful



On Tue, May 28, 2019, 10:59 PM Jez P <mr.n...@gmail.com> wrote:
As far as I can see your code isn’t waiting for the composite future to complete. Maybe I’m misreading, but it looks to me as though you set the composite future handler, and then call callexitstatus. So you start things off, set the handler, and then kill your vertx instance. This is exactly consistent with the logging you’re seeing.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/bf5c6819-8036-444a-bc69-218316211686%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jez P

unread,
May 29, 2019, 6:36:57 AM5/29/19
to vert.x
Well bluntly I am not sure I'd run up a verx instance for this at all. What's wrong with using an executor with a pool of threads? I don't see that you're getting any benefit from using vert.x given everything you're using it to do is doing blocking i/o and you are running it up specially for the job and destroying it once the job is done. What's the rationale for involving vert.x here at all?


On Wednesday, May 29, 2019 at 9:31:09 AM UTC+1, Hemanth Shekar wrote:
Hi Jez

How do I wait for composite future to complete. 
Any pointers in this will be helpful


On Tue, May 28, 2019, 10:59 PM Jez P <mr....@gmail.com> wrote:
As far as I can see your code isn’t waiting for the composite future to complete. Maybe I’m misreading, but it looks to me as though you set the composite future handler, and then call callexitstatus. So you start things off, set the handler, and then kill your vertx instance. This is exactly consistent with the logging you’re seeing.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ve...@googlegroups.com.

Jez P

unread,
May 29, 2019, 6:38:03 AM5/29/19
to vert.x
(i.e. as you've currently described the task, I would go about it very differently from the start, It feels like you're forcing vert.x in where you don't really want non-blocking i/o to me. 

Jez P

unread,
May 29, 2019, 6:43:57 AM5/29/19
to vert.x
If you really must do it this way, then I would, prior to deploying your main vertical, create a countdown latch and pass it into the main verticle, and on your main (non-vert.x) thread, call await on the latch. 

The composite future completion handler should then call the countdown method on the latch. This will release your main thread to continue when the future is complete. Then you close your vert.x instance.

However, this all feels like overkill for what you seem to be trying to do, IMO. This feels like you're using vert.x to use vert.x, Java has perfectly good executor APIs for the kind of "load lots of files and process them in parallel, doing blocking work" scenario you appear to be trying to operate. Instead you're jumping through hoops to fit it into the vert.x non-blocking approach. I think vert.x is awesome, but I think this approach is leading you to over-complex code for zero benefit, which means you should consider doing it another way. Don't use vert.x just for the sake of using vert.x. 

Jez P

unread,
May 29, 2019, 6:45:18 AM5/29/19
to vert.x
I consider keeping things simple particularly important where you have an "urgent requirement" by the way. If the requirement is urgent, then do it the easy way, not the hard way. 

Hemanth Shekar

unread,
May 29, 2019, 10:08:11 AM5/29/19
to ve...@googlegroups.com
Thanks Jez. 
This is very helpful 
It needs to go to the next environment by this week after testing . 
Thanks for your inputs . 

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages