[Akka-Streams] Using Framing.delimiter where ByteString stream is part of a Tuple

273 views
Skip to first unread message

Paul Brown

unread,
Sep 15, 2016, 2:30:51 AM9/15/16
to Akka User List
Hi Akka Stream Users,

I'm fairly new to Akka Streams, and only just getting starting understanding some of its concepts. 

One problem I'm having is trying to understand how Tuples are handled in a stream. In my scenario I have a Source emitting Tuple2 of (ZipEntry(), ByteString()). ZipEntry contains basic information (name, date created) about the zip file, and the ByteString contains the extracted text of the zip file.

I'm comfortable getting a stream performing a delimit of a ByteString with the following.
Source.map(_._2).via(Framing.delimiter(ByteString("\n"), Int.MaxValue)))

However, what I would like to be able to do is maintain the ZipEntry information alongside the ByteString for each new row created through the delimiter. That way I can process the stream further using groupBy etc. 

Example Source --> Sink

Source input:
(ZipEntry(name1.txt, date1), ByteString(58, 49, ..., 54, 58, ..., 48, 48, ...) )
(ZipEntry(name2.txt, date2), ByteString(49, 58, ..., 58, 54, ..., 48, 48, ...) )


Sink output:
(ZipEntry(name1.txt, date), ByteString(58, 49, ...) )
(ZipEntry(name1.txt, date), ByteString(54, 58, ...) )
(ZipEntry(name1.txt, date), ByteString(48, 48, ...) )
...
(ZipEntry(name2.txt, date), ByteString(58, 49, ...) )
(ZipEntry(name2.txt, date), ByteString(54, 58, ...) )
(ZipEntry(name2.txt, date), ByteString(48, 48, ...) )
...

Should I be looking at a breaking it down into a RunnableGraph? 
Any help/pointers to documentation on the correct approach would be appreciated.

Thanks
Paul

Johannes Rudolph

unread,
Sep 20, 2016, 6:35:53 AM9/20/16
to Akka User List
Hi Paul,

a combinator to achieve something like this has been proposed several times but I think there hasn't been consensus how to implement it exactly. The latest approach is discussed here:


Johannes

Paul Brown

unread,
Oct 5, 2016, 12:56:42 AM10/5/16
to Akka User List
Thanks Johannes,

I'm very new to Akka and trying to understand the concepts, so some of the discussion in issue 50 seems to be relevant to my problem. However, what I'm probably looking for is guidance on the implementation (even if it's verbose and not implemented as a combinator yet). Some discussion in the issue talks about maintaining some context between tuple elements in and out of a flow, and it appears some have been able to implement a solution using an innerflow/substream with tuple pattern matching, however I cannot locate any examples that would assist me.

Therefore at a basic level I'm trying to implement the following in akka streams where the split would be taken care of by an 'innerflow' using a Framing.delimiter.

val source = List(("stringstuff", "ByteString\nThe\nquick\nbrown\nfox\n"))

source.flatMap {
  case (s, bs) => bs.split("\n").toList.map {
    newbs => (s, newbs)
    }
 }  

The flow would need to be able to receive an element as a tuple Flow[(String, ByteString)], then map the ByteString part of the tuple through a Framing.delimiter. Each delimited ByteString is then mapped back into a tuple (String, ByteString) with its original associated string value.
The transformation flow is effectively producing more out tuples due to the delimiter however the format is the same as the in tuples.

Sorry I've been trying various Flows/Graph combinations and keep getting stuck at outputting the new tuple from the Flow once the delimited ByteString has been produced, so if anyone has any help/advice, that be appreciated.

Thanks in advance.
Paul
Reply all
Reply to author
Forward
0 new messages