bytestream lines

47 views
Skip to first unread message

Tran Ma

unread,
Aug 12, 2015, 3:13:13 AM8/12/15
to Haskell Pipes
Hi all,

I'm delimiting a bytestream like this:

mkLines :: Monad m => Producer ByteString m () -> Producer ByteString m ()
mkLines = PipesGroup.concats . view PipesByteString.lines

I thought this should behave like a regular `lines` function, e.g. delimiting "foo,bar\nfizz,buzz\n" into [ "foo,bar", "fizz,buzz" ], regardless of how `hGetSome` chunks the original stream, but this isn't the case. For example, running `fmap length . PP.toListM . mkLines . PB.fromHandle` on a 25000-lines file gives 25569. 

Should I be peeking at each ByteString to break on a "\n" character myself? That is already what `lines` in pipes-bytestring is doing though.

Cheers,

Michael Thompson

unread,
Aug 12, 2015, 10:09:03 AM8/12/15
to Haskell Pipes
Right, `concats` is just erasing the `FreeT` boundaries. 
The operations of `pipes-bytestring` don't guarantee anything about 
the underlying chunks -- except that they don't get bigger (there
are a couple of exceptions, like concatMap), so that 'streaming' is
maintained. 

If you want to make a producer of strict bytestrings, one for each line,
you need other combinators. This is a little surprising at first. 
See Gabriel's discussion here, for example


Michael Thompson

unread,
Aug 12, 2015, 10:44:49 AM8/12/15
to Haskell Pipes
It won't be a problem here, but it looks like the standard solution
mentioned by Gabriel there is subtly quadratic for the reasons that ndmitchell
brings up here, a little obscurely https://github.com/snoyberg/conduit/pull/209
(the lines function in question is similar to your mkLines)

If that's right, then to avoid this one needs to do like ndmitchell and emulate


a simple way around this would be to do something like

   L.purely PG.folds (fmap mconcat L.list) ::  PB.FreeT (Producer ByteString m) m r -> Producer ByteString m r

The left fold collecting a list is a little strange; there are other ways of going about it, e.g.
mapping each bytestring chunk to a builder, using L.mconcat, etc.

Note that ByteString implements mconcat specially, not as `foldr mappend mempty` but with the
`concat` I linked above. Maybe `Control.Foldl.ByteString` should have its own concatenating 
`Fold ByteString ByteString`  This might pass through some strict list type to collect
the fragments before building the bytestring, though probably there is no problem with 
the solution mentioned above.


Michael Thompson

unread,
Aug 12, 2015, 11:07:04 AM8/12/15
to Haskell Pipes
Sorry for spamming the list. The `Data.ByteString.Internal.concat` function I meant to link is 


A `Control.Foldl.ByteString` replica might e.g. take a maximum size argument to bring the question
of safety to the user's attention.

Gabriel Gonzalez

unread,
Aug 16, 2015, 11:12:17 AM8/16/15
to haskel...@googlegroups.com, ma.ngo...@gmail.com
To make an analogy to `ByteString` operations, what you did was essentially equivalent to:

    ByteString.concat . ByteString.lines

In other words, your code just deleted all the newlines.

The intuition going into the `pipes-bytestring`/`pipes-text` libraries is that each element of the stream is one chunk of unspecified size that doesn't necessarily align to line boundaries.  `pipes-bytestring`/`pipes-text` will sometimes slice these chunks into finer chunks but it will (almost) never combine them into larger chunks, in order to guarantee that all operations use a bounded amount of memory.  The exception to this rule is the `chunksOf'` function.

To illustrate what happened in your code, I will use a "list of lists" notation, where the outer list is the `FreeT` and the inner list is each `Producer` group within that `FreeT` and each element is one value emitted by a `Producer`.

So let's imagine that you had a text file that looked like this:

```
ABCDEF
GHIJKLMNO
PQR
```

However, when you read it in as a `Producer` of unaligned chunks your stream of chunks you might get something like this:

    [ "ABC", "DEF\nGHI", "JKL", "MNO\nPQR\n"]

Note that the chunk boundaries don't necessarily correspond to newline boundaries.

Now, when you use `view Pipes.ByteString.lines`, you transform it into this:

    [["ABC", "DEF"], ["GHI", "JKL", "MNO"], ["PQR"]]

Each inner list corresponds to one line (represented as a `Producer`), possibly emitting a stream of multiple chunks.  The outer list is the `FreeT`.

When you follow up with `Pipes.Group.concats` you just concatenate the inner lists together again:

    ["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"]

This is probably not what you wanted.  The elements of the concatenated stream don't represent lines.  They just represent chunks from the original stream with all newlines deleted and turned into chunk boundaries.

This is why the number of elements you got was greater than the number of lines when you counted how many elements you got.  You introduced one new chunk boundary per newline plus whatever natural chunk boundaries existed beforehand.

So if you actually want each element of the `Producer` to be one line long, the trick to do this is:

    import Control.Foldl (purely, mconcat)
    import Pipes.Group (folds)

    purely folds mconcat . view Pipes.ByteString.lines
        :: Monad m => Producer ByteString m r -> Producer ByteString m r

I make this deliberately hard to discover in order to encourage people to do things in a proper streaming fashion.  The reason why is that there is no upper bound on how long a line may be, so if you do this then you risk unbounded space usage.

The more idiomatic approach is to preserve streaming by using `pipes-group` idioms.  For example, if you wanted to map a function over each line, you would write something like this:

    -- Append an exclamation mark to the end of each line
    over (Pipes.ByteString.lines . Pipes.Group.individually) (<* yield "!")

That would run in constant space no matter how large each line is.  The general type (which I've simplified a bit), would be:

    over (Pipes.ByteString.lines . Pipes.Group.individually)
        :: Monad m
        -> (forall x . Producer ByteString m x -> Producer ByteString m x)
        -- ^ Function to process each line
        -> Producer ByteString m r -> Producer ByteString m r

If you provide more specific details about what you want to do with each line I can direct you to the appropriate `pipes-group` or `pipes-bytestring` utility that preserves streaming.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Reply all
Reply to author
Forward
0 new messages