Akka stream compression / decompression

556 views
Skip to first unread message

Nicolas Jozwiak

unread,
Oct 7, 2014, 4:53:22 PM10/7/14
to akka...@googlegroups.com

Hello,

   I’m currently using akka stream to stream some big files and it works quit well :) 

   But in the FlowGraph stream I want to add some compression and decompression steps.

   Compression seems working correctly, but I’ve got some offset errors when the decompression is executed. 

To help, here is the code :

On the Producer :

 // ByteIterator implements Iterable[Byte] and reads the file byte by byte

 val byteStream = new ByteIterator(buffStream)

 FlowGraph { implicit builder =>

    val broadcast = Broadcast[ByteString]

    val in = IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString))

    val compress = FlowFrom[ByteString].map(StreamOps.compress)

    val out = SubscriberSink(outputStream)

    in ~> broadcast ~> compress ~> out

  }.run()


  val toByteString = { bytes: Seq[Byte] =>

    val b = new ByteStringBuilder

    b.sizeHint(bytes.length)

    b ++= bytes

    b.result()

  }


On the Subscriber :

 val writeToFile = ForeachSink {

    data: ByteString =>

      channel.write(data.asByteBuffer)

  }

  val in = PublisherSource(publisher)

  FlowGraph { implicit builder =>

    val decompress = FlowFrom[ByteString].map(StreamOps.decompress)

    val broadcast = Broadcast[ByteString]

    in ~> decompress ~> broadcast ~> writeToFile

  }.run()

Compression, decompression are based on LZ4 Api :

def compress(inputBuff: Array[Byte]): Array[Byte] = {

    val inputSize = inputBuff.length

    val lz4 = lz4factory.fastCompressor

    val maxOutputSize = lz4.maxCompressedLength(inputSize)

    val outputBuff = new Array[Byte](maxOutputSize + 4)

    val outputSize = lz4.compress(inputBuff, 0, inputSize, outputBuff, 4, maxOutputSize)


    outputBuff(0) = (inputSize & 0xff).toByte

    outputBuff(1) = (inputSize >> 8 & 0xff).toByte

    outputBuff(2) = (inputSize >> 16 & 0xff).toByte

    outputBuff(3) = (inputSize >> 24 & 0xff).toByte

    outputBuff.take(outputSize + 4)

  }


  def decompress(inputBuff: Array[Byte]): Array[Byte] = {

    val size: Int = (inputBuff(0).asInstanceOf[Int] & 0xff) |

      (inputBuff(1).asInstanceOf[Int] & 0xff) << 8 |

      (inputBuff(2).asInstanceOf[Int] & 0xff) << 16 |

      (inputBuff(3).asInstanceOf[Int] & 0xff) << 24

    val lz4 = lz4factory.fastDecompressor()

    val outputBuff = new Array[Byte](size)

    lz4.decompress(inputBuff, 4, outputBuff, 0, size)

    outputBuff

  }

In my mind, the decompression process is executed with a byte array which has not the same size than the one compress…

Do you have any clues on that ?

Thanks for your help. 

Martynas Mickevičius

unread,
Oct 8, 2014, 4:34:17 AM10/8/14
to akka...@googlegroups.com
Hello Nicolas,

is this the code you are running or is the the simplified example?

I am asking, because you do not need to use FlowGraph in a linear use case like this. Broadcast junction that you create and use only once is meant to split the stream into two or more legs.

Another issue in this code on the publisher side is that you are grouping and mapping on iterator and not on the Flow. I think you should move these operation to the flow.

The producer side with FlowGraph creation removed and combinator operations moved to the Flow then would look like:

// warning, not compiled
IteratorSource(byteStream).
grouped(chunkSize).map(StreamOps.toByteString).produceTo(outputStream)

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Martynas Mickevičius
TypesafeReactive Apps on the JVM

Nicolas Jozwiak

unread,
Oct 8, 2014, 5:08:19 AM10/8/14
to akka...@googlegroups.com
Hi Martynas,

   Yes it's a simplified example, because we use another branch to do some some digester process.
    Here the complete code  :

    FlowGraph { implicit builder =>
      val broadcast = Broadcast[ByteString]

      // streamDigester is an extends to Subscriber and update a MessageDigest object
      val digester = SubscriberSink(streamDigester)

      val in = IteratorSource(byteStream.grouped(chunkSize).map(StreamOps.toByteString))

      val compress = FlowFrom[ByteString].map(StreamOps.compress)
      val out = SubscriberSink(outputStream)

      in ~> broadcast ~> compress ~> out
      broadcast ~> digester

    }.run()

Nicolas.

Martynas Mickevičius

unread,
Oct 8, 2014, 5:14:53 AM10/8/14
to akka...@googlegroups.com
Okay. That makes sense then.

What kind of errors are you getting? Are the ByteStrings received in a different order?

Have you tested compress/decompress method implementation?

Nicolas Jozwiak

unread,
Oct 8, 2014, 5:34:27 AM10/8/14
to akka...@googlegroups.com
The compress / decompress implementation works well (tested with a simple example without stream).

The ByteString I receive are not the same size from the compression. For example a chunk size of 20000, it will be 19065 after compression. From the decompression I will receive 19070 to decompress...

Here the stack error : 

net.jpountz.lz4.LZ4Exception: Error decoding offset 10 of input buffer
at net.jpountz.lz4.LZ4JNIFastDecompressor.decompress(LZ4JNIFastDecompressor.java:33) ~[lz4-1.2.0.jar:na]

Nicolas.

Martynas Mickevičius

unread,
Oct 8, 2014, 5:37:42 AM10/8/14
to akka...@googlegroups.com
Have you tried it with smaller chunk sized?

Nicolas Jozwiak

unread,
Oct 8, 2014, 9:45:44 AM10/8/14
to akka...@googlegroups.com
I've just tried with a smaller chunk size (1000), and now I have a different file size at the end => (Expected file length: '114541', actual: '95541')
Weird...

Nicolas

Endre Varga

unread,
Oct 8, 2014, 9:48:42 AM10/8/14
to akka...@googlegroups.com
Do you properly close/flush the outputstream?

-Endre

Nicolas Jozwiak

unread,
Oct 8, 2014, 9:57:30 AM10/8/14
to akka...@googlegroups.com
Yes we close the outputstream at the end.

I've continued to do some tests with small chunk size and it passes only one time... When I retry I have the above error => (Expected file length: '114541', actual: '95541') or a Java OutOfMemoryError...

Nicolas.

Endre Varga

unread,
Oct 8, 2014, 10:13:24 AM10/8/14
to akka...@googlegroups.com
Nicolas, I am not sure the code you wrote does what you want. 

First of all reading something in byte-by-byte is highly inefficient, you should have your source as a stream of blocks, for example Source[ByteString]. The .grouped() will not help at all, because now you will get Seq[Byte] which means a huge amount of allocations.

Using a compressor stage simply as a map seems suspicious, too. Are you certain that your compress or decompress function gives back something all the time it is called? I.e are there cases when the compressor/decompressor cannot provide any outputs because it needs more inputs still?

Also, isn't this lz4 below stateful?

val lz4 = lz4factory.fastDecompressor()

    val outputBuff = new Array[Byte](size)

    lz4.decompress(inputBuff, 4, outputBuff, 0, size)


If it is stateful then you shouln't create a new instance every time. If it is not stateful, then I don't know what this library does -- does it need the whole file to be in memory to decompress?


I don't know the library you used, but shouldn't this line


 lz4.decompress(inputBuff, 4, outputBuff, 0, size)


return the count of the actually written bytes? Looking at this (https://github.com/jpountz/lz4-java/blob/master/src/java/net/jpountz/lz4/LZ4JNIFastDecompressor.java) it does, which means you cannot just return the whole Array.


If you can package up a small failing sample (not just snippets) we can see then if this is a bug or not.


-Endre

Nicolas Jozwiak

unread,
Oct 8, 2014, 11:41:22 AM10/8/14
to akka...@googlegroups.com
One thing I didn't tell is on my local computer it works perfectly.

Problem arises when I test with network. How does akka stream manage TCP fragments ?

Roland Kuhn

unread,
Oct 8, 2014, 2:57:17 PM10/8/14
to akka-user
Hi Nicolas,

this means that you are relying on “message boundaries”, which I put in quotes because TCP does not support this concept: you will receive bytes in arbitrary chunks, not in the chunks that you sent. Coupled with the lz4 state issue that Endre pointed out this should explain what is going on.

So, in summary: you’ll need to use the same lz4 instance throughout the full compression, and you’ll need to also use one lz4 instance throughout the whole decompression. If the lz4 implementation actually works then this will do it.

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Reactive apps on the JVM.
twitter: @rolandkuhn


Christopher Hunt

unread,
Oct 8, 2014, 7:22:32 PM10/8/14
to akka...@googlegroups.com
...but your example doesn't close the output stream right?

Don't you want something like this:

writeToFile
 
.future(someMaterializer)
 
.andThen {
   
case _ => out.close()
 
}

Nicolas Jozwiak

unread,
Oct 9, 2014, 3:56:19 AM10/9/14
to akka...@googlegroups.com
Hi Roland,

  Thanks for your response, I modified my example to manage the lz4 state but I still have the issue...  But if you said that "TCP does not support this concept", I think I have to manage the fragments myself... 

Nicolas Jozwiak

unread,
Oct 9, 2014, 6:19:22 AM10/9/14
to akka...@googlegroups.com
@Christopher

   I don't put all the examples here to simplify, but yes I definitely close the output stream at the end ;)

Nicolas.
Reply all
Reply to author
Forward
0 new messages