An design to extract data from a data-chunk stream.

63 views
Skip to first unread message

Zyxwvu S

unread,
Nov 22, 2014, 11:38:03 PM11/22/14
to lu...@googlegroups.com
I want to share this design to luvit developers because with this design takes advantages of Lua when we are parsing a stream. This design is very flexable and worked very well in one of my projects.

* Stream

Stream is a series of ordered data-chunks like what uv_read_cb provides. We should just push those data-chunks to the reader and they will be concatenated by the reader as the buffer and passed to the decoder.

* Reader


Reader is the main thing of this design. It's a Lua object that you push data chunks from a stream to it and read Lua values from it. When there is no enough data it will yield the thread and when there's been enough data decoded, resumes the coroutine.

Note: the Reader can be read from one coroutine at the one time. If an queue is implemented this limit will disappear, but the queue seems never required. Data chunks can be pushed from any thread.

* Decoder

Decoder is a Lua function that receives a string of the Reader's buffer and returns an Lua value, plus the rest buffer if any. If the buffer doesn't have enough data, return nil.  If it encounters a bad buffer, it should throw a Lua error.

If we want to optimize the program, we can simply implement the decoder in C.

These code will express more clear.

```
local MT_Reader = {}

-- Tell the reader no more data will be pushed.
-- If the reader want to continue reading, it will call onrestore first.
-- Pause is used to avoid too much data buffered.
function MT_Reader:Pause(onrestore)
    assert(not self.stopped, "already paused")
    self.paused = onrestore
    if self.decoder then
        SafeResume(self.readco, nil, "paused")
    end
end

-- Push a chunk.
-- If str is nil, no more data will be pushed to this reader. All read requests will return nil plus "stopped".
-- When str is nil, if there is also a err, the err will be returned by the pending Read.
-- This returns the length of the buffer. We can use it to check if we need to pause reading.
function MT_Reader:Push(str, err)
    if not str then
        self.stopped = true
        if self.decoder then
            SafeResume(self.readco, nil, err or "stopped")
        end
    elseif self.buffer then
        self.buffer = self.buffer .. str
        if self.decoder then
            local s, result, rest = pcall(self.decoder, self.buffer)
            if not s then
                SafeResume(self.readco, nil, result)
            elseif result then
                if rest and #rest > 0 then
                    self.buffer = rest
                else
                    self.buffer = nil
                end
                SafeResume(self.readco, result)
            end
        end
    else
        if self.decoder then
            local s, result, rest = pcall(self.decoder, str)
            if not s then
                self.buffer = str
                SafeResume(self.readco, nil, result)
            elseif result then
                if rest and #rest > 0 then
                    self.buffer = rest
                end
                SafeResume(self.readco, result)
            else
                self.buffer = str
            end
        else
            self.buffer = str
        end
    end
    if self.buffer then return #self.buffer else return 0 end
end

-- Read data with the decoder.
function MT_Reader:Read(decoder)
    assert(not self.decoder, "already reading")
    if self.buffer then
        local s, result, rest = pcall(decoder, self.buffer)
        if not s then
            return nil, result
        elseif result then
            if rest and #rest > 0 then
                self.buffer = rest
            else
                self.buffer = nil
            end
            return result
        end
    end
    if self.stopped then return nil, "stopped" end
    if self.paused then self.paused(self) self.paused = false end
    self.readco, self.decoder = crunning(), decoder
    local result, err = cyield()
    self.readco, self.decoder = nil, nil
    return result, err
end

-- Read data of the length.
function MT_Reader:Get(len)
    return self:Read(function(buffer)
        if #buffer == len then return buffer elseif #buffer > len then
            return buffer:sub(1, len), buffer:sub(len + 1, -1)
        end
    end)
end

-- This equals with:
--    Reader:Read(function(buffer) return buffer end)
-- but works faster if there's been buffered data.
-- If len is provided, this can be used to limit the length readed data,
-- but it will be not optimized.
function MT_Reader:Peek(len)
    if len then
        return self:Read(function(buffer)
            if #buffer <= len then return buffer elseif #buffer > len then
                return buffer:sub(1, len), buffer:sub(len + 1, -1)
            end
        end)
    end
    assert(not self.decoder, "already reading")
    if self.buffer then
        local buffer = self.buffer
        self.buffer = nil
        return self.buffer
    end
    if self.stopped then return nil, "stopped" end
    if self.paused then self.paused(self) self.paused = false end
    self.readco, self.decoder = crunning(), function(buffer)
        return buffer
    end
    local result, err = cyield()
    self.readco, self.decoder = nil, nil
    return result, err
end

return function() return setmetatable({}, MT_Reader) end
```

An HTTP header decoder which reads a HTTP header as a table:
```
local function decodeHead(buffer)
    local l, r = buffer:find("\r?\n\r?\n")
    if l and r then
        local head = buffer:sub(1, l - 1)
        local result, firstLine = {}, true
        for l in head:gmatch("([^\r\n]+)") do
            if firstLine then
                local verb, resource = l:match("^([A-Z]+) ([^%s]+) HTTP/1%.[01]$")
                assert(verb and resource, "bad request")
                result.method, result.resource_orig = verb, resource
                firstLine = false
            else
                local k, v = l:match("^([A-Za-z0-9%-]+):%s?(.+)$")
                assert(k and v, "bad request")
                result.headers[k:lower()] = v
            end
        end
        return result, buffer:sub(r + 1, -1)
    end
end
```

An FastCGI decoder which reads a FastCGI payload:
```
local function decodeFCGI(buffer)
    if #buffer < 8 then return nil end
    local dl, pl = buffer:byte(5) * 0x100 + buffer:byte(6), buffer:byte(7)
    if #buffer >= dl + pl + 8 then
        local result = { buffer:byte(2), buffer:sub(9,  8 + dl) }
        return result, buffer:sub(9 + dl + pl, -1)
    end
end
```

An WebSocket frame decoder:
```
function decodeWSF(block)
    if #block < 2 then return end
    local b1, b2 = block:byte(1, 2)
    local skip, len = 2, band(b2, 0x7F)
    if len == 126 then
        if #block < 4 then return end
        local l1, l2 = block:byte(3, 4)
        len, skip = l1 * 0x100 + l2, 4
    elseif len == 127 then
        if #block < 10 then return end
        local l1, l2, l3, l4, l5, l6, l7, l8 = block:byte(3, 10)
        len = l1 * 0x100000000000000 + l2 * 0x1000000000000 +
              l3 * 0x10000000000 + l4 * 0x100000000 +
              l5 * 0x1000000 + l6 * 0x10000 + l7 * 0x100 + l8
        skip = 10
    end
    local mask = band(b2, 0x80) == 0x80
    if mask then
        if #block < skip + 4 then return end
        skip = skip + 4
        mask = { block:byte(skip - 3, skip) }
    end
    if #block < skip + len then return end
    local data = block:sub(skip + 1, skip + len)
    if mask then
        local new = {}
        for i = 1, #data do
            new[i] = bxor(data:byte(i, i), mask[(i - 1) % 4 + 1])
        end
        data = schar(unpack(new))
    end
    local result = { band(b1, 0xF), data }
    result.FIN = band(b1, 0x80) == 0x80
    return result, block:sub(skip + len + 1, -1)
end
```

Tim Caswell

unread,
Nov 24, 2014, 2:48:06 PM11/24/14
to lu...@googlegroups.com
This is a pretty good design.  I especially like the part about the reader simply suspending when it needs more data to keep things simple and out of callback crazyness.

For comparison, have you see the channel style I've been starting to use in the new luvi-up branch of luvit?

Basically an app or codec, or whatever is implemented as a coroutine with a blocking read and write function.  Read will return the next chunk from it's source and write will send data to it's output.  If there is nothing to read, it will suspend.  It will also suspend on write if the thing you're writing to isn't ready for more data.


function (read, write)
end

The thing I like about your decoder being a separate function is it doesn't involve any coroutines or callbacks.  It's clever how it handles too much data and not enough data.  Though I do worry about bad performance when input chunks are huge and contain a large number of output events.  That fear is of course based on gut feeling and not actual use of benchmarks.  WIth tail calls in lua and not hitting coroutine APIs, this could be quite fast in luajit.


--
You received this message because you are subscribed to the Google Groups "luvit" group.
To unsubscribe from this group and stop receiving emails from it, send an email to luvit+un...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

imzy...@gmail.com

unread,
Nov 24, 2014, 9:12:36 PM11/24/14
to lu...@googlegroups.com
I am learning about channels these days. 

This reader design doesn't care how we write, because blocking writing sometimes causes troubles. For example, if we are making an IRC chatroom, we had to start lots of threads to start sending operations at the same time. So writing cannot be limited blocking unlike reading. I can't find a situation when non-blocking reading is necessary.

For performance problems, the main source is the string concat. So there's a Peek method lets the reader just redirect the pushed chunk to avoid concats. When we already know how many bytes to read, for example by http content-length, we can use it. 
You received this message because you are subscribed to a topic in the Google Groups "luvit" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/luvit/y6ZB7dyAwzo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to luvit+un...@googlegroups.com.

Tim Caswell

unread,
Nov 25, 2014, 9:26:22 AM11/25/14
to lu...@googlegroups.com
I write my latest parser (for a new binary syncing protocol I'm designing) in your decoder style.  I really enjoyed it.  I did have two questions though:

 - How do you signify errors in the decoder?  What if the input data is malformed?
 - Also, is there a way to signify end of stream?   I normally emit nil for EOS, but here nil means more input wanted.

imzy...@gmail.com

unread,
Nov 25, 2014, 12:35:28 PM11/25/14
to lu...@googlegroups.com
1. When there is a malformed stream passed to the decoder, the decoder should throw a Lua error, then Read will return nil plus the error. If there is no enough data, just return nil.

2. When a stream ends, call Push with nil and a optional error message if the stream ends with an error. Then blocked Read will return nil plus the error message or 'stopped' and the flag reader.stopped will be true. If there is still some data in the buffer, Read still works until the buffer got empty or decoders began to say that there is no enough data. (look at the code of Read, it will return nil plus 'stopped' in this situation)
Reply all
Reply to author
Forward
0 new messages