Hello,
I’m currently using akka stream to stream some big files and it works quit well :)
But in the FlowGraph stream I want to add some compression and decompression steps.
Compression seems working correctly, but I’ve got some offset errors when the decompression is executed.
To help, here is the code :
On the Producer :
// ByteIterator implements Iterable[Byte] and reads the file byte by byte
val byteStream = new ByteIterator(buffStream)
FlowGraph { implicit builder =>
val broadcast = Broadcast[ByteString]
val in = IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString))
val compress = FlowFrom[ByteString].map(StreamOps.compress)
val out = SubscriberSink(outputStream)
in ~> broadcast ~> compress ~> out
}.run()
val toByteString = { bytes: Seq[Byte] =>
val b = new ByteStringBuilder
b.sizeHint(bytes.length)
b ++= bytes
b.result()
}
On the Subscriber :
val writeToFile = ForeachSink {
data: ByteString =>
channel.write(data.asByteBuffer)
}
val in = PublisherSource(publisher)
FlowGraph { implicit builder =>
val decompress = FlowFrom[ByteString].map(StreamOps.decompress)
val broadcast = Broadcast[ByteString]
in ~> decompress ~> broadcast ~> writeToFile
}.run()
Compression, decompression are based on LZ4 Api :
def compress(inputBuff: Array[Byte]): Array[Byte] = {
val inputSize = inputBuff.length
val lz4 = lz4factory.fastCompressor
val maxOutputSize = lz4.maxCompressedLength(inputSize)
val outputBuff = new Array[Byte](maxOutputSize + 4)
val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4, maxOutputSize)
outputBuff(0) = (inputSize & 0xff).toByte
outputBuff(1) = (inputSize >> 8 & 0xff).toByte
outputBuff(2) = (inputSize >> 16 & 0xff).toByte
outputBuff(3) = (inputSize >> 24 & 0xff).toByte
outputBuff.take(outputSize + 4)
}
def decompress(inputBuff: Array[Byte]): Array[Byte] = {
val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) |
(inputBuff(1).asInstanceOf[Int] & 0xff) << 8 |
(inputBuff(2).asInstanceOf[Int] & 0xff) << 16 |
(inputBuff(3).asInstanceOf[Int] & 0xff) << 24
val lz4 = lz4factory.fastDecompressor()
val outputBuff = new Array[Byte](size)
lz4.decompress(inputBuff, 4, outputBuff, 0, size)
outputBuff
}
In my mind, the decompression process is executed with a byte array which has not the same size than the one compress…
Do you have any clues on that ?
Thanks for your help.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
val lz4 = lz4factory.fastDecompressor()
val outputBuff = new Array[Byte](size)
lz4.decompress(inputBuff, 4, outputBuff, 0, size)
If it is stateful then you shouln't create a new instance every time. If it is not stateful, then I don't know what this library does -- does it need the whole file to be in memory to decompress?
I don't know the library you used, but shouldn't this line
lz4.decompress(inputBuff, 4, outputBuff, 0, size)
return the count of the actually written bytes? Looking at this (https://github.com/jpountz/lz4-java/blob/master/src/java/net/jpountz/lz4/LZ4JNIFastDecompressor.java) it does, which means you cannot just return the whole Array.
If you can package up a small failing sample (not just snippets) we can see then if this is a bug or not.
-Endre
writeToFile
.future(someMaterializer)
.andThen {
case _ => out.close()
}