How to write Flows which perform chunking and unchunking of large Strings?

109 views
Skip to first unread message

Kevin Esler

unread,
Oct 27, 2015, 12:18:58 PM10/27/15
to Akka User List
I am starting to work with akka-streams and have a problem to solve which seems easy but I'm having trouble with it.

I have a Source[String, Unit] and the strings can be quite large. I'd like to split them into chunks of some maximum size such as 4k, transmit them and reassemble them at the other end.

The first thought i had was to create a Flow which:
  • assigns a unique reassembly key of type String, to each original String,
  • outputs a Stream of  (String, String), elements of which are understood as (reassemblyKey, chunk)

Then create a Flow which does the reasembly. So I coded these as follows before having serious doubts about them:


import akka.stream.scaladsl.{ Flow, FlowGraph, MergePreferred, Source }
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.atomic.AtomicLong

object WithStringChunking {

 
private val counter = new AtomicLong(0L)

 
/**
   * Factory for a string-chunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[String]`
   */

 
def apply(maxChunkSize: Int): Flow[String, (String, String), Unit] = {
    val uniqueReassemblyKey
= counter.getAndIncrement().toString

   
Flow[String].mapConcat { s =>
      s
.grouped(maxChunkSize).map((uniqueReassemblyKey, _)).toList
   
}
 
}
}


import scala.concurrent.ExecutionContext

import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

object WithStringUnchunking {

 
/**
   * Factory for a string-unchunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[(String, String)]`
   * where the string pairs consist of:
   *   - reassemblyKey (all related chunks have the same
   *
   * BUG: I think this will buffer too much
   * Need to take advantage of adjacency
   */

 
def apply()
           
(implicit materializer: Materializer, ec: ExecutionContext)
 
: Flow[(String, String), String, Unit]
 
= {
    val result
= Flow[(String, String)]
     
// Group together the chunks with the same re-assembly key
     
.groupBy { case (key, chunk) => key }
     
.mapAsync(1) { case (_, pairsSource) =>
       
// Discard the keys.
        val chunksSource
: Source[String, Unit] = pairsSource.map { _._2 }

       
// Concatenate the chunks
       
for {
          chunks
<- chunksSource.grouped(1000000).runWith(Sink.head)
       
} yield chunks.fold(""){ _ + _ }
     
}
    result
 
}
}


This code compiles but I think the unchunking part will always buffer "too much", since most messages will be much less than 8 million characters.


I really want the unchunking part to:

  • read and buffer chunks until the reassembly key changes
  • concatenate the chunks and emit the original message

How to express this using the akks-streams API?


Or maybe a more tractable encoding would be convert the original Source[String, Unit] to another Source[String, Unit] which consists of:

  • a string which when interpreted as an integer is understood to be N, the number of chunks to follow which are to be reassembled to a single message
  • the N chunks
  • ....etc

Kevin Esler

unread,
Oct 27, 2015, 2:52:25 PM10/27/15
to Akka User List
Just found the docs section entitled "Chunking up a stream of ByteStrings into limited size ByteStrings"  in the docs here: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-cookbook.html

So I think the answer must be to write a PushPullStage for the unchunking.

Reading further....

Kevin Esler

unread,
Oct 27, 2015, 2:52:27 PM10/27/15
to Akka User List
My 2nd attempt. It type checks so it should work :-)

import scala.concurrent.ExecutionContext

import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.stage.Context
import akka.stream.stage.PushPullStage
import akka.stream.stage.SyncDirective

object WithStringUnchunking {

  type
StringPair = (String, String)

 
/**

   * Factory for a string-unchunking flow.
   *
   * @param maxChunkSize the maximum size of chunk
   * @return string chunking flow to be connected to a `Source[(String, String)]`
   * where the string pairs consist of:
   *   - reassemblyKey (all related chunks have the same key)
   *   - chunk
   */

 
def apply(): Flow[(String, String), String, Unit]
 
= Flow[(String, String)] transform( () => new Unchunker )

 
private class Unchunker extends PushPullStage[StringPair, String] {

   
private var currentKey: Option[String] = None
   
private val currentMsg = new StringBuilder()

   
override def onPush(pair: (String, String), ctx: Context[String]): SyncDirective = {
      val
(key, chunk) = pair

     
if (currentKey.isEmpty) {
       
// Case: 1st time
        currentKey
= Some(key)
        currentMsg
++= chunk
        ctx
.pull()
     
} else if (currentKey == Some(key)) {
       
// Case: new chunk for current message.
        currentMsg
++= chunk
        ctx
.pull()
     
} else {
       
// Case: chunk for next message arrived. Push out previous message.
        val result
= ctx.push(currentMsg.mkString)
        currentMsg
.clear()

        currentKey
= Some(key)
        currentMsg
++= chunk
        result
     
}
   
}

   
override def onPull(ctx: Context[String]) : SyncDirective =
     
if (!ctx.isFinishing) ctx.pull()
     
else {
       
if (currentMsg.nonEmpty) ctx.pushAndFinish(currentMsg.mkString)
       
else ctx.finish()
     
}
 
}
}



On Tuesday, October 27, 2015 at 12:18:58 PM UTC-4, Kevin Esler wrote:
Reply all
Reply to author
Forward
0 new messages