Then create a Flow which does the reasembly. So I coded these as follows before having serious doubts about them:
import akka.stream.scaladsl.{ Flow, FlowGraph, MergePreferred, Source }
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.atomic.AtomicLong
object WithStringChunking {
private val counter = new AtomicLong(0L)
/**
* Factory for a string-chunking flow.
*
* @param maxChunkSize the maximum size of chunk
* @return string chunking flow to be connected to a `Source[String]`
*/
def apply(maxChunkSize: Int): Flow[String, (String, String), Unit] = {
val uniqueReassemblyKey = counter.getAndIncrement().toString
Flow[String].mapConcat { s =>
s.grouped(maxChunkSize).map((uniqueReassemblyKey, _)).toList
}
}
}
import scala.concurrent.ExecutionContext
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object WithStringUnchunking {
/**
* Factory for a string-unchunking flow.
*
* @param maxChunkSize the maximum size of chunk
* @return string chunking flow to be connected to a `Source[(String, String)]`
* where the string pairs consist of:
* - reassemblyKey (all related chunks have the same
*
* BUG: I think this will buffer too much
* Need to take advantage of adjacency
*/
def apply()
(implicit materializer: Materializer, ec: ExecutionContext)
: Flow[(String, String), String, Unit]
= {
val result = Flow[(String, String)]
// Group together the chunks with the same re-assembly key
.groupBy { case (key, chunk) => key }
.mapAsync(1) { case (_, pairsSource) =>
// Discard the keys.
val chunksSource: Source[String, Unit] = pairsSource.map { _._2 }
// Concatenate the chunks
for {
chunks <- chunksSource.grouped(1000000).runWith(Sink.head)
} yield chunks.fold(""){ _ + _ }
}
result
}
}
I really want the unchunking part to:
How to express this using the akks-streams API?
Or maybe a more tractable encoding would be convert the original Source[String, Unit] to another Source[String, Unit] which consists of:
import scala.concurrent.ExecutionContext
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import akka.stream.stage.Context
import akka.stream.stage.PushPullStage
import akka.stream.stage.SyncDirective
object WithStringUnchunking {
type StringPair = (String, String)
/**
* Factory for a string-unchunking flow.
*
* @param maxChunkSize the maximum size of chunk
* @return string chunking flow to be connected to a `Source[(String, String)]`
* where the string pairs consist of:
* - reassemblyKey (all related chunks have the same key)
* - chunk
*/
def apply(): Flow[(String, String), String, Unit]
= Flow[(String, String)] transform( () => new Unchunker )
private class Unchunker extends PushPullStage[StringPair, String] {
private var currentKey: Option[String] = None
private val currentMsg = new StringBuilder()
override def onPush(pair: (String, String), ctx: Context[String]): SyncDirective = {
val (key, chunk) = pair
if (currentKey.isEmpty) {
// Case: 1st time
currentKey = Some(key)
currentMsg ++= chunk
ctx.pull()
} else if (currentKey == Some(key)) {
// Case: new chunk for current message.
currentMsg ++= chunk
ctx.pull()
} else {
// Case: chunk for next message arrived. Push out previous message.
val result = ctx.push(currentMsg.mkString)
currentMsg.clear()
currentKey = Some(key)
currentMsg ++= chunk
result
}
}
override def onPull(ctx: Context[String]) : SyncDirective =
if (!ctx.isFinishing) ctx.pull()
else {
if (currentMsg.nonEmpty) ctx.pushAndFinish(currentMsg.mkString)
else ctx.finish()
}
}
}