Read file line by line - how to use RecordParser to read an AsyncFile

963 views
Skip to first unread message

Luis Lebolo

unread,
Apr 4, 2018, 5:33:45 PM4/4/18
to vert.x
Hello,

I'm trying to read a file from the file system and process it line by line. From various docs, I think the way to do this is through an AsyncFile and a RecordParser. Ideally, I'd like to just pump the data, but RecordParser is not a WriteStream:

AsyncFile asyncFile = vertx.fileSystem().openBlocking(/*path and options*/);

RecordParser recordParser = RecordParser.newDelimited("\n", buffer -> {
 
// Do something per line
});

Pump.pump(asyncFile, recordParse).start(); // Error - RecordParser cannot be converted to WriteStream

So I guess I have to do my own pumping, like

RecordParser recordParser = RecordParser.newDelimited("\n", buffer -> {
 
// Do something per line, this gets called per line successfully
})
.exceptionHandler(cause -> {
 
// Do I need this? What are the repercussions if I don't have this handler? Are exceptions just lost?
})
.endHandler(_void -> {
 
// This never gets called!
});

asyncFile
.handler(recordParser);

While I can see the line handler being called, the recordParser.endHandler never gets called. What am I doing wrong? Is the file not being closed?

Even if I can get this to work, I'm still not handling situations that Pump would for me. Is there any way to use Pump in this scenario?

Thanks in advance!

Julien Viet

unread,
Apr 5, 2018, 3:29:21 AM4/5/18
to ve...@googlegroups.com
Hi,

you forget that the record parser is just a transformation step and does not consume the final results.

so you should wrap the AsyncFile as a ReadStream with the RecordParser and you obtain a new ReadStream.

then from there you can apply back-pressure on the ReadStream itself (pause/resume) or pipe it to a new destination that shall be a WriteStream (another AsyncFile, a TCP socket, etc...)

HTH

Julien

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at https://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/25e9b7e5-01c9-4694-ac80-bcbf500caec9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Thomas SEGISMONT

unread,
Apr 5, 2018, 5:08:54 AM4/5/18
to ve...@googlegroups.com

Hi,

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.

Luis Lebolo

unread,
Apr 11, 2018, 8:20:20 AM4/11/18
to vert.x
Thanks again for your reply, I wanted to follow up on error handling. So far, I ended up using:

AsyncFile asyncFile = vertx.fileSystem().openBlocking(filename, fileOptions);
asyncFile
.exceptionHandler(cause -> {
 
// Would love to catch all exceptions, log them, fail a promise, close the file, etc.
 
// but I can't throw a caught exception in my output handler and a runtime exception doesn't propagate to this handler
}).endHandler(aVoid -> {
  asyncFile
.close();
});

RecordParser recordParser = RecordParser.newDelimited("\n");

asyncFile
.handler(recordParser);

recordParser
.setOutput(bufferedLine -> {
 
// throw new Exception("foo"); // Java doesn't allow me to throw checked exception in a lambda
 
// throw new RuntimeException("foo"); // Doesn't propagate to asyncFile.exceptionHandler (not recordParser.exceptionHandler)
});

How do I exit the output handler so that exceptions are caught by my exception handler? Otherwise, I have to catch various exceptions in the output handler and constantly fail a future, close the file and "return" so that the handler doesn't keep executing. Seems like I'm violating DRY.

Hi,

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.

Thomas SEGISMONT

unread,
Apr 11, 2018, 8:57:40 AM4/11/18
to ve...@googlegroups.com
Not completely sure I understood your problem, but I think you should be fine by writing a class method for handling errors, and invoke it where it's needed.

To unsubscribe from this group and stop receiving emails from it, send an email to vertx+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages