Hi Team ,
We have a urgent requirement where we get multiple files in a folder .
We need to load the files to a database .
The design was that the vertx event loop will make a execute blocking call to pick each file and load it into database .
I am using execute blocking to create multiple worker verticles and using composite futures to check if all of them succeed .
The Log shows that all the files are getting picked up but before all the loads can complete the vertx.close() ( last line of the program executes ) and the program fails .
what am I doing wrong . I was expecting the compositeFuture.join to wait till all the execute blocking calls complete .
This is a urgent requirement . Any help would be greatly appreciated.
Below are the logs and below that is the code .
Logs -
000000_2
Doing the job...
000000_1
Doing the job...
file_0
Doing the job...
000000_0
Doing the job...
000000_3
Doing the job...
000000_4
19/05/28 19:15:17 INFO csrtrmld.Loadmodule: inside try of loadOneFile :
Doing the job...
19/05/28 19:15:17 INFO csrtrmld.Loadmodule: Future size : 6
19/05/28 19:15:17 INFO csrtrmld.MainVerticle: END OF THE FUNCTION CALLING VERTX CLOSE
19/05/28 19:15:17 INFO csrtrmld.MainVerticle: Failed
Code-
I am trying to use composite future to
I am attaching the code as i cannot give the repository link here .
while(t.hasNext()) {
Path p = t.next().getPath();
System.out.println(p.getName());
FutureexecuteBackendWorker.add(executeBackendWorker(bucket, vertx, Seperator, LogDir, Interval, fs, p)) ;
}
LOGGER.info("Future size : "+ FutureexecuteBackendWorker.size() ) ;
CompositeFuture Compositefuture = CompositeFuture.join(FutureexecuteBackendWorker) ;
return Compositefuture ;
private Future<String> executeBackendWorker(Bucket bucket, Vertx vertx, String Seperator, String LogDir, int Interval,
FileSystem fs, Path p) {
System.out.println("Doing the job...");
Future<String> future = Future.future();
vertx.executeBlocking(call -> {
try {
loadOneFile(bucket, Seperator, LogDir, Interval, fs, p);
} catch (Exception ex) {
MainVerticle.exitStatus = 1 ;
future.fail("failed");
}
future.complete("success");
}, res -> {
LOGGER.info("Loading a file completed" + p);
});
return future;
}
private void loadOneFile(Bucket bucket, String Seperator, String LogDir, int Interval, FileSystem fs, Path p)
throws IOException {
FSDataInputStream inputStream= fs.open(p);
Scanner sc = new Scanner(new InputStreamReader(inputStream));
try {
LOGGER.info("inside try of loadOneFile :") ;
String s;
while (sc.hasNextLine()) {
int i = 0 ;
List<JsonDocument> documents = new ArrayList<JsonDocument>();
while (i < Interval && sc.hasNextLine( ) )
{
String SingleDocLine ="" ;
String Key = "";
try {
SingleDocLine = sc.nextLine();
String[] split = SingleDocLine.split(Seperator);
String value = split[1];
Key = split[0];
JsonTranscoder trans = new JsonTranscoder();
JsonObject jobj = trans.stringToJsonObject(value);
documents.add(JsonDocument.create(Key, jobj)) ;
} catch (Exception E) {
LOGGER.error("ERROR PROCESSING KEY : " + Key);
appendStrToFile(LogDir+"/"+"ERROR"+modifiedDate ,SingleDocLine+"\n") ;
MainVerticle.exitStatus = 1 ;
continue;
}
i = i+1 ;
}
Observable
.from(documents)
.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(final JsonDocument document) {
return bucket.async().upsert(document)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build())
;
}
})
.last()
.toBlocking()
.single();
countInt = countInt + i ;
LOGGER.info("PROCESSED RECORD COUNT : " + countInt);
}
if (sc.ioException() != null) {
LOGGER.error("ONE OF THE THREAD FAILED WHILE LOADING");
MainVerticle.exitStatus = 1 ;
throw sc.ioException();
}
} finally {
if (inputStream != null) {
inputStream.close();
}
if (sc != null) {
sc.close();
}
}
}