ANN: streaming, streaming-bytestring

179 views
Skip to first unread message

Michael Thompson

unread,
Aug 26, 2015, 12:15:25 PM8/26/15
to Haskell Pipes
I uploaded two packages to hackage that may interest readers of this list.



It's probably a terrible idea!

`streaming` is an attempt to implement `FreeT` in the style of `Pipes.Internal`, with a zillion more
associated functions. There is a Prelude especially for the fundamental 'Producer' case - `Stream ((,) a) m r` and
its iterations, `Stream (Stream ((,)a) m) m r`  . Functor-general operations are in `Streaming` and use a pipes-like
nomenclature of using an `s` to express functor-generality, e.g. maps, splitsAt, folds etc etc.
The `Streaming.Prelude`  uses regular prelude names and replicates `Pipes.Prelude` and `Pipes.Group` as far as is 
possible -- but turning the pipes into functions as you would expect.

`streaming-bytestring`  is just the obviously correct implementation `Data.ByteString.Lazy` 
(but with the  same `Pipes.Internal`  maneouver.) It tries to follow the api of the bytestring library 
as far as possible, with some us of typical pipes language. Here

     Producer ByteString m r

as it is used in `Pipes.ByteString`, passes over into the monadic

     ByteString m r

I'm not sure I've succeeded yet in hiding the implementation in either case; it is only
in the much more general `streaming` case that there may be some genuine trouble I am overlooking.

Strangely I had hit on the idea of naming the strict pair `Of a b` before seeing the similar attempt
of ertes' `fuse`; it is almost inevitable where you re-express 

     Producer a m r

as 

    Stream (Of a) m r

but I adopted his contructor, `a :> b`.   

I conceived this scheme ages ago, but was bent on using fancy optimization schemes. When it occurred to
me just to follow Gabriel's method in Pipe.Internal - and that `Data.ByteString.Lazy` already incorporated 
highly optimized versions of the natural Prelude of functions - it was mostly mechanical. I was amazed by
the speed of the `ByteString m r` operations. (In some places I don't have the well-thought-out
material from Data.ByteString.Lazy to work with, so there are no doubt some really bad operations in there!)

Anyway, part of interest is that it de-pipes (and de-lensifies) some of the material in 
Pipes.Prelude, Pipes.Group and Pipes.ByteString so that you can see what 
Gabriel is thinking more clearly. Pipes is incapable of expressing the distinction between

     ByteString m r
     Stream (Of B.ByteString) m r

and uses the latter to implement the former, which is the basis of much of the difficulty people
have with the library, for example, the chronic difficulty with the type of lines, which here appears as

     ByteString m r -> Stream (ByteString m) m r

exactly corresponding to the type in Data.ByteString.Lazy

    LB.ByteString -> [LB.ByteString]

The pipes user naturally expects the equivalence

     Producer ByteString m r     ~ Stream (Of B.ByteString) m r

-- since after all that's what it is! -- but Gabriel is systematically forcing the equivalence

    Producer ByteString m r    ~ ByteString m r

The pipes-group/pipes-bytestring correspondence

    ([a],[b])                 ~   Stream (Of a) m (Stream (Of a) m r)   ~   Producer a m (Producer a m r)
    (ByteString, ByteString)  ~   ByteString m (ByteString m r)         ~   Producer ByteString m (Producer ByteString m r)
    [[a]]                     ~   Stream (Stream (Of a) m) m r          ~   FreeT (Producer a m) m r
    [ByteString]              ~   Stream (ByteString m) m r             ~   FreeT (Producer ByteString m) m r

emerges very naturally from the material. (In ertes' library FreeT is called List, which is perhaps better).

I implemented some of the shell-like examples from the io-streams tutorial here https://gist.github.com/michaelt/6c6843e6dd8030e95d58
The Streaming.Prelude module could use a tutorial, but the little ghci examples in the haddocks might be of use.

Again, properly arranged, they might operate as a sort of preliminary tutorial for pipes-group and pipes-bytestring, I don't know.

yours Michael



Gabriel Gonzalez

unread,
Aug 26, 2015, 12:37:55 PM8/26/15
to haskel...@googlegroups.com, practica...@gmail.com
I'm really happy about this!  I've been wanting a package like this for some time, for two main reasons:

* It greatly simplifies the inferred types and error messages of the `pipes-group`/`pipes-bytestring`/`pipes-text` packages
* It's more efficient (because it uses the same trick as `Pipes.Internal`)
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

John Wiegley

unread,
Aug 26, 2015, 1:20:27 PM8/26/15
to haskel...@googlegroups.com
>>>>> Gabriel Gonzalez <gabri...@gmail.com> writes:

> I'm really happy about this! I've been wanting a package like this for some
> time, for two main reasons:

This looks like a great library, Michael. I had started down a similar road
with http://hackage.haskell.org/package/fusion, and working on a companion
library pipes-fusion (not yet released), based on work done by Snoyman for
conduit. But I like the generality of Stream, and its resulting ability to
replace FreeT, while fusion's streams are more fixed to the Stream (Of a)
construction. Plus, I had not yet demonstrated that stream fusion still occurs
for really large pipelines, where the inliner may become overwhelmed.

Looking forward to further developments!

John

Daniel Díaz

unread,
Aug 26, 2015, 6:05:43 PM8/26/15
to Haskell Pipes
How to feed one of these Streams to a Fold from foldl?

Gabriel Gonzalez

unread,
Aug 26, 2015, 6:07:59 PM8/26/15
to haskel...@googlegroups.com, diaz.c...@gmail.com
I can tell that Michael anticipated this question, because the type of `Streaming.Prelude.fold'` is exactly the right type for the `Control.Foldl.purely` function, so you would just write:

    Control.Foldl.purely fold' :: Fold a b -> Stream (Of a) m r -> m (Of b r)

Michael Thompson

unread,
Aug 26, 2015, 6:58:12 PM8/26/15
to Haskell Pipes, diaz.c...@gmail.com
Right, everything is tailored to use with `foldl`, though I haven't documented this yet. 
In general everything marked with a final single quote is tailored for use with `mapsM` 
-- which is like `maps` but allows a monadic return type:

     maps     :: (Monad m, Functor f) => (forall x. f x -> g x) -> Stream f m r -> Stream g m r     -- as in pipes group

     mapsM  :: (Monad m, Functor f) =>   (forall x. f x -> m (g x)) -> Stream f m r -> Stream g m r


thus
    
       mapsM S.sum'  :: (Monad m, Num a) => Stream (Stream (Of a) m) m r -> Stream (Of a) m r

but more importantly

       mapsM (L.purely S.fold' fld)   :: Monad m =>  Fold a b -> Stream (Stream (Of a) m) m r -> Stream (Of b) m r

       \fld -> mapsM (L.purely S.fold' fld) . S.group  
           :: (Eq a, Monad m) =>   LB.Fold a b -> Stream (Of a) m r -> Stream (Of b) m r

and better yet

       \fld -> mapsM (L.impurely S.foldM' fld) . chunksOf 20   
           :: Monad m =>  L.FoldM m a b -> Stream (Of a) m r -> Stream (Of b) m r

and so on.

The use of the single quote, and the duplication, are a little obnoxious, of course. 
(The same device is used in the bytestring modules.) I was thinking of dropping the folds 
that just return things like `m Int` in favor of the others, which are `mapsM`-able, 
and fit better with the idea of the package. 

But the simple ones looked so good in ghci when I was testing examples...

It is also easy to write accumulating folds in various ways. I am trying to think of a way of
presenting this that is easy to take in, so one doesn't get lost in the rank 2 arguments and so on.

Michael Thompson

unread,
Aug 27, 2015, 11:18:05 AM8/27/15
to Haskell Pipes, jo...@newartisans.com

Hi John, I missed this in between.  I had come across this on github, and stole 
the benchmarking module you were using at some point.  I hadn't noticed it
was on hackage. I will study it! What kept defeating me with these 
optimization schemes is that these libraries inevitably involve the user's
writing recursive definitions themselves. I don't know how the fancy
conduit system does if you put a user defined function in the 
pipeline.

Michael Thompson

unread,
Aug 27, 2015, 11:21:48 AM8/27/15
to Haskell Pipes, jo...@newartisans.com
By the way, `streaming` should now build with ghc-7.6 - 7.10 `streaming-bytestring` with ghc-7.8 - 7.10. 
I dropped the attoparsec and http modules since they interfered with making repairs indirectly
via travis.

Pierre Radermecker

unread,
Aug 27, 2015, 11:26:14 AM8/27/15
to haskel...@googlegroups.com
Out of curiosity why is the benchmark for "simple-conduit" and "fusion" commented ?

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.



--
Pierre

Michael Thompson

unread,
Aug 27, 2015, 11:40:31 AM8/27/15
to Haskell Pipes
No reason, I was using for internal purposes, adding io-streams. I will try them all again, now that I see fusion is on hackage.

Michael Thompson

unread,
Aug 27, 2015, 12:09:53 PM8/27/15
to Haskell Pipes
Here are the results I got.  As I said in the readme, this is the perfect test for conduit's
fusion system. The difference from Pipes is just the avoidance of piping; one could also
write direct functions, like 

     filter :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m r

Apart from that, the only difference would be that between

    Step (a :> rest)

and

    Respond a (\() -> rest)

... right?  Step has an extra constructor, Respond hides the 'rest' behind a trivial function.


    filter (\x -> x `mod` 2 == 0) . map (+1) . drop 1000 . map (+1) . filter even . fromList

    basic tests
      passes tests

    Finished in 0.0007 seconds
    1 example, 0 failures
    benchmarking basic/stream
    time                 83.72 ms   (74.88 ms .. 89.07 ms)
                         0.990 R²   (0.984 R² .. 0.998 R²)
    mean                 84.65 ms   (81.77 ms .. 90.04 ms)
    std dev              6.301 ms   (3.204 ms .. 9.272 ms)
    variance introduced by outliers: 19% (moderately inflated)

    benchmarking basic/iostreams
    time                 259.1 ms   (230.2 ms .. 288.6 ms)
                         0.996 R²   (0.989 R² .. 1.000 R²)
    mean                 259.6 ms   (251.4 ms .. 265.8 ms)
    std dev              8.216 ms   (3.490 ms .. 11.14 ms)
    variance introduced by outliers: 16% (moderately inflated)

    benchmarking basic/pipes
    time                 226.0 ms   (198.4 ms .. 246.4 ms)
                         0.988 R²   (0.954 R² .. 1.000 R²)
    mean                 240.8 ms   (228.6 ms .. 262.2 ms)
    std dev              21.00 ms   (6.224 ms .. 29.73 ms)
    variance introduced by outliers: 16% (moderately inflated)

    benchmarking basic/conduit
    time                 99.89 ms   (89.77 ms .. 107.4 ms)
                         0.986 R²   (0.957 R² .. 0.997 R²)
    mean                 94.71 ms   (82.12 ms .. 100.8 ms)
    std dev              12.84 ms   (5.137 ms .. 21.08 ms)
    variance introduced by outliers: 43% (moderately inflated)

    benchmarking basic/simple-conduit
    time                 201.3 ms   (178.7 ms .. 213.6 ms)
                         0.994 R²   (0.978 R² .. 1.000 R²)
    mean                 196.2 ms   (187.7 ms .. 200.1 ms)
    std dev              6.333 ms   (3.048 ms .. 7.932 ms)
    variance introduced by outliers: 14% (moderately inflated)

    benchmarking basic/fusion
    time                 124.7 ms   (112.6 ms .. 131.7 ms)
                         0.993 R²   (0.983 R² .. 0.999 R²)
    mean                 123.5 ms   (119.6 ms .. 132.0 ms)
    std dev              7.873 ms   (2.680 ms .. 12.60 ms)
    variance introduced by outliers: 12% (moderately inflated)

John Wiegley

unread,
Aug 27, 2015, 1:56:59 PM8/27/15
to haskel...@googlegroups.com
>>>>> Michael Thompson <practica...@gmail.com> writes:

> I updated John's benchmark
> https://gist.github.com/michaelt/7f89dc8b366b30bb6accHere are the results I
> got. As I said in the readme, this is the perfect test for conduit's fusion
> system.

Hi Michael,

One other thing you might want to benchmark: I found that under criterion,
conduit and fusion were equal in speed; but the timing of standalone
executables showed fusion to be 2-3x faster than conduit. I haven't yet
tracked down what the cause of so great a difference could be.

Also, we could automate the generation of Core for each example and compare
number of functions, average length of functions, and how many times each
library's core data constructors appear in the output.

John

John Wiegley

unread,
Aug 27, 2015, 1:59:42 PM8/27/15
to haskel...@googlegroups.com
>>>>> Michael Thompson <practica...@gmail.com> writes:

> I don't know how the fancy conduit system does if you put a user defined
> function in the pipeline.

A recursive user-defined function will disrupt fusion in the pipeline, and
Michael mentions this when he says that any direct use of yield or await will
abort fusion. However, a non-recursive, inlined function should work fine.

I also wonder if Gabriel's comment (in another e-mail) about exhausting the
inliner applies to conduit too, and what the actual limits are.

John

Gabriel Gonzalez

unread,
Aug 27, 2015, 3:36:34 PM8/27/15
to haskel...@googlegroups.com
You only exhaust the inliner if the pipeline has lots of stages with branches (i.e. take, for example). Things like "map", won't exhaust the inliner.

Michael Thompson

unread,
Aug 27, 2015, 4:21:14 PM8/27/15
to Haskell Pipes

> A recursive user-defined function will

> disrupt fusion in the pipeline


Right I was thinking of cases where

disrupting the pipeline might be a

problem. I shouldn't have just mentioned

recursive definitions, but also the use

of the monad instance. In the case of

stream-fusion/ co-church-encoding sorts

of cases, the monad instance is often

itself kind of dreadful; you do

something like pack the old stream as

the hidden state of a new stream, rather

than using Int and StrictMaybe Int and

sensible things like that for the stream

state. (This is why ghc rejected a

stream-fusion Data.List, if I

understand; the literature is always

talking about concatMap.)


But it seems strange to have a producer

concept where you advise the user not to

make yield statements! It's sort of the

whole point that they can ... or maybe

I'm wrong about that.


Gabriel's approach, if I understand, 

tries to do as well by the compiler as one

can, *given* that practically everything the 

user will give to it will be a mass of recursive

loops that employ the monad instance

and so on.  It isn't like a fusion approach which

aims to produce a prelude the user won't

depart from. 


But I don't know, that's the picture I ended up with.


Michael






Gabriel Gonzalez

unread,
Aug 27, 2015, 4:26:33 PM8/27/15
to haskel...@googlegroups.com, practica...@gmail.com
The other reason I chose this approach (besides being more flexible from a user perspective) is that it's easier to maintain.  I figure if I just do the simple and obvious thing then performance will slowly improve as people work on the GHC optimizer, whereas if I try to be too tricky then I risk changes to the optimizer degrading my code's performance because I'm relying too much on specific quirks of GHC's optimization process.

John Wiegley

unread,
Aug 27, 2015, 6:52:56 PM8/27/15
to haskel...@googlegroups.com
>>>>> Michael Thompson <practica...@gmail.com> writes:

> I shouldn't have just mentioned recursive definitions, but also the use of
> the monad instance.

Yes, exactly, I found this to be the case too. The problem is that we simply
can't know at compile-time what the next stage in the pipeline will be, once
the input stream is completed:

instance (Monad m, Applicative m) => Monad (Stream a m) where
return = Stream (return . Done)
Stream step i >>= f = Stream step' (Left i)
where
step' (Left s) = step s >>= \case
Done r -> switchStream (f r)
Skip s' -> return $ Skip (Left s')
Yield s' a -> return $ Yield (Left s') a
step' (Right s) = switchStream s

switchStream :: Functor m
=> Stream a m r -> m (Step (Either s (Stream a m r)) a r)
switchStream (Stream step i) = step i <&> \case
Done r -> Done r
Skip s' -> Skip (Right (Stream step s'))
Yield s' a -> Yield (Right (Stream step s')) a

As you said, this probably inlines and fuses the input stream, but fails to
fuse the stream that is return by the bound function.

John
Reply all
Reply to author
Forward
0 new messages