Messages delimited by multiple newlines?

32 views
Skip to first unread message

David M

unread,
Aug 18, 2016, 2:41:39 PM8/18/16
to Haskell Pipes
I have a protocol where each message in the protocol is composed of several headers followed by newlines, and then an extra newline at the end to signal end of headers.  Sort of like headers in email, but goes on forever.

header: value\n
header: value\n
\n
newmessage\n
header: value\n
anotherheader: value\n
\n


To process message by messages I ended up making my own lens below based on Pipes.Parse.span.  It is a bit ugly.  I wanted to know if there was a more idiomatic way to do this using Pipes.text.line(s), Pipes.Group.groupBy, something else in Pipes.Parse or something else interesting like that.

endline :: Monad m => Lens' (Producer Text m a) (Producer Text m (Producer Text m a))
endline k p0 = fmap join (k (go p0 ""))
  where
      go :: Monad m => Producer Text m a -> Text -> Producer Text m (Producer Text m a)
      go p accum = do
        x <- lift (next p)
        case x of
              Left   r        -> return (return r)
              Right ("", p')  -> go p' ""
              Right (txt, p') -> do
                let
                 (prefix, suffix) = T.breakOn "\n\n" (T.append accum txt)
                 prefixnotrailing = T.dropWhileEnd (== '\n') prefix
                 trailingeol = T.takeWhileEnd (== '\n') prefix
                if (not . T.null $ prefixnotrailing)
                  then yield prefixnotrailing
                  else return ()
                if T.null suffix
                  then go p' trailingeol
                  else return (yield (T.drop 2 (T.append trailingeol suffix)) >> p')


Michael Thompson

unread,
Aug 18, 2016, 8:13:04 PM8/18/16
to Haskell Pipes
I think as a lens this should reinsert the missing "\n\n",

     endline' :: Monad m => Lens' (Producer Text m a) (Producer Text m (Producer Text m a))
     endline' k p0 = fmap (>>= (yield "\n\n" >>)) (k (go p0 ""))   -- instead of just `join`

     >>> :set -XOverloadedStrings
     >>> Text.toLazyM $ over endline id $ yield "hello\nworld\n\ngoodbye\nworld"
     "hello\nworldgoodbye\nworld"
     >>> Text.toLazyM $ over endline' id $ yield "hello\nworld\n\ngoodbye\nworld"
     "hello\nworld\n\ngoodbye\nworld"

The latter is more the desired behavior.

Note that as it stands this silently accumulates everything 
before the double newline. One might try to avoid this, but if 
these are not foreign files it might not be worth worrying about.
If you don't want to accumulate, the thing that is really missing 
is a function like `Data.Text.breakOn` and `Data.Text.splitOn` 
which could break a text stream on a given text shape, here "\n\n" 
I remember trying to implement these, but it is surprisingly 
difficult to do in a non-plodding way. `text` uses an extremely 
complicated, but fast, method that collects a list of all indices 
at which the match text begins.

One thing I wondered is, are you going to repeat this across the 
length of the file? If so, and accumulating lines isn't an issue, 
then one might approach the problem starting by accumulating lines

     >>> :t PG.folds mappend mempty id . view Text.lines   -- I was using Pipes.Group = PG; Pipes.Text = Text
     PG.folds mappend mempty id . view Text.lines
      :: Monad m => Producer Text m r -> Producer Text m r

Now we have a producer of separate accumulated lines and can break on an empty line.

    >>> let accumLines = PG.folds mappend mempty id . view Text.lines
    >>> let txt = yield "hello\nworld\n\ngoodbye"
    >>> runEffect $ accumLines txt >-> P.print
    "hello"
    "world"
    ""
    "goodbye"

Now we are missing something like a `split :: a -> Producer a m r -> FreeT (Producer a m) m r`
which should be in Pipes.Group I think. If all of the above is not completely wrong headed,
we could try to write one. It should be pretty simple given `Pipes.Parse.span`. But note 
that with `Pipes.Parse.span` we are close to the effect you wanted:

    >>> rest <- runEffect $ accumLines txt  ^. PP.span (/= mempty) >-> P.print
    "hello"
    "world"
    *Main
    >>> runEffect $ rest >-> P.print
    ""
    "goodbye"

    >>> runEffect $ rest >-> P.drop 1 >-> P.print

    "goodbye"


You can collect the lines of the first record with `P.toListM'`

     >>> (rec1,rest) <-  P.toListM' $  accumLines txt  ^. PP.span (/= mempty) 

     >>> rec1

     ["hello","world"]


Like I said, this may all be wrong-headed and uncomprehending, I'm partly just testing

ideas to see what you are intending.

Michael Thompson

unread,
Aug 18, 2016, 8:42:10 PM8/18/16
to Haskell Pipes
Oh, I should have said, when I suggested we might propose a `split` 
to Gabriel for Pipes.Group, that it should be a lens, thus with the type

    split :: (Monad m)  => (a -> Bool) -> Lens' (Producer a m r) (FreeT (Producer a m) m r)

or maybe 

    split :: (Monad m, Eq a) => a -> Lens' (Producer a m r) (FreeT (Producer a m) m r)

It occurs to me `Pipes.Group.groupBy` permits things like this

     >>> let cmp a b = a /=  mempty && b /= mempty 

     >>> let kludge p = PG.folds (\a b -> a <> "\n" <> b)  mempty id (accumLines p ^. PG.groupsBy cmp)

     >>>  runEffect $ kludge txt >-> P.filter (/= "\n") >-> P.print

     "\nhello\nworld"

     "\ngoodbye"



Michael Thompson

unread,
Aug 18, 2016, 8:48:15 PM8/18/16
to Haskell Pipes
One more correction: it seems the type should be  

   split :: (Monad m, Eq a) => a -> Lens' (Producer a m r) (FreeT (Producer a m) m r)

since the lens should reinsert the thing we split on. If we use a 
predicate, we don't know what it was.

David M

unread,
Aug 19, 2016, 9:38:10 AM8/19/16
to Haskell Pipes
This is really good stuff.  Nice catch on missing the re-yielding of \n\n.  I'm sure that might have already bitten me as I experimented.  I should have just used the lens iso function to create my lens instead of trying to do it manually.


>Note that as it stands this silently accumulates everything before the double newline.

I was hoping that actually would not be the case, since in my function I'm immediately yielding everything I can before looping.  If it is accumulating data as it runs, that's exactly what I was hoping to avoid by writing this.

Your musings with groupsBy and PG.folds were definitely along the lines of what I was looking for.  Reading what you wrote, I see why it is such a hard problem.  You basically have to split the bytestring up into chunks that can be consumed by other combinators, but it is hard because all of the existing pipes functions for bytestrings assume breaking or spanning on a single character, rather than several of them in a row.


split :: (Monad m, Eq a) => a -> Lens' (Producer a m r) (FreeT (Producer a m) m r)
I guess what I we really need is exactly what you specified, but it won't work for bytestrings in the general case because the delimiter may not be in its own chunk.

There would have to be bytestring / text specific combinators in pipes-bytestring and pipes-text like
split :: (Monad m) => ByteString -> Lens' (Producer BytesString m r) (FreeT (Producer ByteString m) m r)
that does the splitting and recombining via an argument.

I only have breakOnText :: Monad m => Text -> Lens' (Producer Text m a) (Producer Text m (Producer Text m a)), and so it is not quite good enough to submit a pull request for.

Michael Thompson

unread,
Aug 19, 2016, 2:39:58 PM8/19/16
to Haskell Pipes
Ah, indeed, I saw the recursive `T.appends`, and then noticed appending in ghci, and was off and running ... But it is in fact just a little reshuffling where we have part of the matching text, here "\n\n".  For the general `breakOnText` the biggest text in memory would I guess be the length of the text we are matching minus 1 plus the length of the biggest incoming chunk?  Do you have the more general `breakOnText` written? I think it should be pretty simple to write the corresponding `splitOnText` as well.  I haven't tried it in a while, but I remember there being an unpleasant but straightforward procedures for going about it; maybe something new will come up here though. They are highly desirable functions, I would think. A general pipes `splitOn` like this on would have a signature like `[a] -> Producer a m r -> FreeT (Producer a m) m r` on the model of the `splitOn` in `Data.List.Split`; or rather, again, the corresponding lens.

Michael Thompson

unread,
Aug 20, 2016, 8:54:51 AM8/20/16
to Haskell Pipes
Hi David,

I made some progress http://sprunge.us/iPPE Defining `splitOn` in terms of `breakOn` is
pretty simple. Ideally one would like to use `T.splitOn`, since it is so fast and would speed
up operations when we are taking in large text chunks; that would be a totally different structure.

The two formulations of `breakOn` follow your method and
just pass along what could be an initial segment of a match: the first just keeps the last bit
that is one Char shorter than the needle (irrespective of content); the other inspects this chunk.
The second is nicer but I was worried the apparatus of inspection was going to slow things
more than it was worth. It doesn't seem worse from some initial tests. Do they seem wrong, or clunky?

Michael

Reply all
Reply to author
Forward
0 new messages