Streaming deflate - can't reliably read chunks as they arrive instead of the entire stream

315 views
Skip to first unread message

A Keeton

unread,
Jul 13, 2016, 11:12:29 AM7/13/16
to golang-nuts
I'm trying to decompress messages arriving over websockets that are using permessage-deflate.

I've run into a couple of issues using flate. 

Assume: 

buf := new(bytes.Buffer)
flater := flate.NewReader(buf)

Each new websockets message is written into buf and then I call Read on flater. It works for 1 or more messages (sometimes several right up until I fill the 32k window). Eventually the Read fails and flate says there is a corrupt byte before some offset. Even after more data has been written to buf, successive reads fail. I presume there's some need to reset the flater to a previous state, but Reset hasn't gotten me anywhere.

However, the data decompresses fine if I save the entire stream to a file and then deflate the file.

What is the correct way to deflate chunks of data as they become available?

(A past post says to call ReadFull but that does not work in this case).

Nigel Tao

unread,
Jul 13, 2016, 11:00:53 PM7/13/16
to A Keeton, golang-nuts
On Thu, Jul 14, 2016 at 12:35 AM, A Keeton <ajke...@gmail.com> wrote:
> I'm trying to decompress messages arriving over websockets that are using
> permessage-deflate.

You say that you're using "per message" deflate...


> Each new websockets message is written into buf and then I call Read on

...but then it sounds like you're concatenating all of the messages
together, writing them all to the one buffer, so you're treating it as
"per stream" deflate instead of "per message".


> flater. It works for 1 or more messages (sometimes several right up until I
> fill the 32k window). Eventually the Read fails and flate says there is a
> corrupt byte before some offset. Even after more data has been written to
> buf, successive reads fail. I presume there's some need to reset the flater
> to a previous state, but Reset hasn't gotten me anywhere.

Calling Reset is probably the way to go, but it's hard to give more
specific advice without seeing your code, preferably a self-contained
example and one that's as small and as simple as possible.

Adam Keeton

unread,
Jul 14, 2016, 7:15:20 PM7/14/16
to Nigel Tao, golang-nuts
but then it sounds like you're concatenating all of the messages together,

I should have mentioned that I reset the websockets buffer after reading from the deflater. 

A small self-contained example with this code would be tricky with all of the websocket handling stuff in the way.

Regardless, with permessage-deflate, the sliding window must be maintained across messages unless no_context_takeover is being used. Creating a new deflater for each message doesn't work. 

I'm not entirely clear with how to work with Reset, specifically the use of dictionaries. Passing 'nil' as a dictionary hasn't gotten me anywhere. I'm guessing that might be the key detail for maintaining the sliding window?

Nigel Tao

unread,
Jul 14, 2016, 9:31:13 PM7/14/16
to Adam Keeton, golang-nuts
On Fri, Jul 15, 2016 at 9:14 AM, Adam Keeton <ajke...@gmail.com> wrote:
> A small self-contained example with this code would be tricky with all of
> the websocket handling stuff in the way.

You could possibly capture the bytes of each compressed message, and
then construct a new websocket-free program that starts with those
bytes and knows what each decompressed message should be.


> I'm not entirely clear with how to work with Reset, specifically the use of
> dictionaries. Passing 'nil' as a dictionary hasn't gotten me anywhere. I'm
> guessing that might be the key detail for maintaining the sliding window?

You might need new API on a flate decompressor to e.g. ack and clear
an explicit mypkg.EndOfMessage error that you repeatedly thread
through the underlying reader. The compress/flate package, like all
io.Reader implementations, expects to read and deliver streams of
bytes, not delimited messages, but websocket (IIUC) isn't really a
stream (two messages "abc" and "def" isn't semantically equivalent to
a single message "abcdef"), and the two models don't quite fit well.

That new API would need to be like the flate.Resetter dance, due to
backwards compat on flate.NewReader returning an io.ReadCloser.

Adam Keeton

unread,
Jul 15, 2016, 4:04:00 PM7/15/16
to Nigel Tao, golang-nuts
I see. Thanks for the help!

>You could possibly capture the bytes of each compressed message, and
> then construct a new websocket-free program that starts with those
> bytes and knows what each decompressed message should be.

I went ahead and wrote some example code. It consists of two implementations. One deflates the entire compressed stream in the expected fashion. The second tries to decompress the payload of each message as it arrives. 

The data was dumped from a live websockets session between Chrome and a Node.js server using Socket.io.

I added a delimiter between each message in the log file so I could separate them out in the test code to simulate a series of individual messages arriving.

Here's the common code for the two proof of concepts:

    f, err := os.Open("flate_log.d")

    if err != nil {
        panic(err)
    }

    buf := bytes.NewBuffer(nil)

    _, err = buf.ReadFrom(f)

    if err != nil {
        panic(err)
    }

    messages := bytes.Split(buf.Bytes(), magic_seperator)
    compData := bytes.NewBuffer(nil)
    deflater := flate.NewReader(compData)


This works as expected, so the data must be sound:

    // Unsplit into a single contiguous stream
    for _, msg := range messages {
        // msg in this case is a single websockets message
        compData.Write(msg)
    }

    decompressed, err := ioutil.ReadAll(deflater)
    fmt.Printf("Err: %v. Decompressed:\n%s\n\n", err, hex.Dump(decompressed[0:len(decompressed)]))


The following example only works for the first few messages before deflate starts complaining about corrupt bytes. The implementation breaks the streaming model like you said. But, AFAIK, some variant of it needs to work to correctly support permessage-deflate with websockets.

    for _, msg := range messages {
        // msg in this case is a single websockets message

        compData.Write(msg)
        decompressed, err := ioutil.ReadAll(deflater)

        fmt.Printf("Err: %v. Decompressed:\n%s\n\n", err, hex.Dump(decompressed[0:len(decompressed)]))

        // Interestingly, ignoring an unexpected EOF allowed more of the messages to be decompressed
        // provided we use Reset here.
        // Without this conditional, things break sooner.
        if err == io.ErrUnexpectedEOF {
            deflater.(flate.Resetter).Reset(compData, nil)
            continue
        }

        if err != nil {
            break
        }

        compData.Reset()
    }

Is there a different implementation of deflate that you think I should try?

Nigel Tao

unread,
Jul 15, 2016, 6:27:59 PM7/15/16
to Adam Keeton, golang-nuts
On Sat, Jul 16, 2016 at 6:03 AM, Adam Keeton <ajke...@gmail.com> wrote:
> Is there a different implementation of deflate that you think I should try?

I don't know of any. Sorry.

Tamás Gulácsi

unread,
Jul 16, 2016, 2:04:59 AM7/16/16
to golang-nuts
What if you create a new deflater for each message? And use a new Buffer, too! Maybe deflater reads too much into the followings.
If this works, you can look into why Reset is not enough.

A Keeton

unread,
Jul 18, 2016, 11:00:51 AM7/18/16
to golang-nuts
I initially tried that. Turns out deflate needs to maintain the same sliding window across messages depending on whether or not context takeover is being used - https://tools.ietf.org/html/rfc7692#section-7

joe...@google.com

unread,
Jul 20, 2016, 11:51:50 AM7/20/16
to golang-nuts
    // Unsplit into a single contiguous stream
    for _, msg := range messages {
        // msg in this case is a single websockets message
        compData.Write(msg)
    }
    decompressed, err := ioutil.ReadAll(deflater)
    fmt.Printf("Err: %v. Decompressed:\n%s\n\n", err, hex.Dump(decompressed[0:len(decompressed)]))
I'm actually surprised that the above logic works with all the messages concatenated. The "permessage-deflate" scheme for websockets uses the same mechanism as RFC 1979, which dictates that the transmitter always calls flate.Writer.Flush and that the framing protocol must remove the 0x0000ffff marker (see RFC 1979, section 2.1, on Data) that comes at the end of a flushed DEFLATE block. Without manually reinserting this 0x0000ffff marker, it is highly unlikely that this represents a valid DEFLATE stream.

With that said, it is possible to actually use the standard library compress/flate to decompress each websocket message, message-by-message.

See this following example:

Adam Keeton

unread,
Jul 20, 2016, 1:36:11 PM7/20/16
to joe...@google.com, golang-nuts
You're right. It wouldn't have worked without the 0x0000ffff. However, the test data was being dumped from an existing codebase handling websockets that was already appending that to each message. 

For reference (and since other people have asked me for it), here's the entire sample code with a link to the test file I used: https://gist.github.com/ajkeeton/87352777b8f458b05f363fe8cd73803c

--
You received this message because you are subscribed to a topic in the Google Groups "golang-nuts" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/golang-nuts/-LqYo5PpHTM/unsubscribe.
To unsubscribe from this group and all its topics, send an email to golang-nuts...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages