How can I do this conduit example with pipes?

197 views
Skip to first unread message

Jacob Stanley

unread,
Jun 10, 2015, 8:53:19 PM6/10/15
to haskel...@googlegroups.com
I have a situation which I found easy to solve with `conduit` but could not figure out with `pipes`.

-- | Given a stream of key/value pairs, fold all the values associated with a
-- key in to a single value. Assumes that the stream has already been
-- sorted/grouped by key.
foldValues :: forall m k v. (Monad m, Eq k) => (v -> v -> v) -> Conduit (k, v) m (k, v)
foldValues step = goM =$= C.catMaybes
  where
    goM :: Conduit (k, v) m (Maybe (k, v))
    goM = do
        s <- C.mapAccum go Nothing
        case s of
          Nothing     -> yield Nothing
          Just (k, x) -> yield (Just (k, x))

    go :: (k, v) -> Maybe (k, v) -> (Maybe (k, v), Maybe (k, v))
    go (k, v) (Nothing)                  = (Just (k, v),        Nothing)
    go (k, v) (Just (k0, x)) | k == k0   = (Just (k, step x v), Nothing)
                             | otherwise = (Just (k, v),        Just (k0, x))

It seems to be related to the fact that with `pipes` I cannot observe that the stream has stopped emitting values. I had a quick look at `pipes-parse` and it seems like maybe that would help?

John Wiegley

unread,
Jun 10, 2015, 10:37:27 PM6/10/15
to Jacob Stanley, haskel...@googlegroups.com
>>>>> Jacob Stanley <ja...@stanley.io> writes:

> It seems to be related to the fact that with `pipes` I cannot observe that
> the stream has stopped emitting values. I had a quick look at `pipes-parse`
> and it seems like maybe that would help?

In order to observe that the stream has stopped, you usually need to take the
producer as an argument, in which case you can continue operating after you've
exhausted it.

John

Tran Ma

unread,
Jun 11, 2015, 3:05:45 AM6/11/15
to haskel...@googlegroups.com
At some point I had to do something similar (summarise values by keys, assuming duplicate keys are in a continuous range) and used pipes-group https://hackage.haskell.org/package/pipes-group

Jacob Stanley

unread,
Jun 11, 2015, 4:19:21 PM6/11/15
to haskel...@googlegroups.com
Thanks for the tips Tran and John, this is what I came up with:

foldValues :: (Monad m, Eq k) => (v -> v -> v) -> Producer (k, v) m r -> Producer (k, v) m r
foldValues append xs =
    P.concat <-< folds step Nothing id (view (groupsBy keyEq) xs)
  where
    keyEq (k, _) (k', _) = k == k'

    step (Nothing)      (k, v) = Just (k, v)
    step (Just (_, v0)) (k, v) = Just (k, v0 `append` v)

Is there any way I can get this to have the type `Pipe (k, v) (k, v) m r`?

Ideally I would like to be able to apply this operation to a consumer as well.

Michael Thompson

unread,
Jun 11, 2015, 5:22:45 PM6/11/15
to haskel...@googlegroups.com
Something of type Pipe (k,v) (k,v) m r would have to miss the last group of values, no? Or maybe I'm not following. If upstream is following a quasi-conduit strategy where await yields a Maybe and Nothing signals eof, you could write, say: 

    pipeFoldValues :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (Maybe (k,v)) (k, v) m ()
    pipeFoldValues append  = go Nothing where

      go (Just (k,v)) = do 
        a <- await
        case a of 
          Nothing -> yield (k,v)  
          Just (k',v') | k == k' -> go (Just (k,append v v'))
          Just (k',v') -> do 
            yield (k,v) 
            go (Just (k',v'))
         
      go Nothing = do 
        a <- await
        go a


or maybe a pipe that maintains this rule is better: 

    pipeFoldValues :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (Maybe (k,v)) (Maybe (k, v)) m r
    pipeFoldValues append  = go Nothing where

      go (Just (k,v)) = do 
        a <- await
        case a of 
          Nothing -> yield (Just (k,v)) >> forever (yield Nothing)
          Just (k',v') | k == k' -> go (Just (k,append v v'))
          Just (k',v') -> do 
            yield (Just (k,v)) 
            go (Just (k',v'))
 
      go Nothing = do 
        a <- await
        go a
 

John Wiegley

unread,
Jun 11, 2015, 5:25:14 PM6/11/15
to Jacob Stanley, haskel...@googlegroups.com
>>>>> Jacob Stanley <ja...@stanley.io> writes:

> Is there any way I can get this to have the type `Pipe (k, v) (k, v) m r`?

Is this what you are looking for:

foldValues :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (k, v) (k, v) m r
foldValues append = loop Nothing
where
loop mx = do
(k, v) <- await
yield (k, maybe v (`append` v) mx)
loop (Just v)

John

John Wiegley

unread,
Jun 11, 2015, 5:29:59 PM6/11/15
to Jacob Stanley, haskel...@googlegroups.com
>>>>> John Wiegley <jo...@newartisans.com> writes:

> Is this what you are looking for:

> foldValues :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (k, v) (k, v) m r
> foldValues append = loop Nothing
> where
> loop mx = do
> (k, v) <- await
> yield (k, maybe v (`append` v) mx)
> loop (Just v)

Ah, n/m, you wanted aggregation of values within key groups. You could add
the key as part of the "state" in mx, to determine whether to append or
restart tracking of v.

John

Michael Thompson

unread,
Jun 11, 2015, 5:30:13 PM6/11/15
to haskel...@googlegroups.com, ja...@stanley.io, jo...@newartisans.com
John, that isn't inspecting the keys for equality, but isn't the main problem that, if when upstream closes, the pipe will still have the last pair received stored in `loop`?

Michael Thompson

unread,
Jun 11, 2015, 5:32:24 PM6/11/15
to haskel...@googlegroups.com, jo...@newartisans.com, ja...@stanley.io
Sorry, emails crossed there.

John Wiegley

unread,
Jun 11, 2015, 5:33:22 PM6/11/15
to Michael Thompson, haskel...@googlegroups.com, ja...@stanley.io
Right, the algorithm should only emit (k, v) when there is a change from k ->
k', so you always lose the sum of the last key when either side terminates. I
think the groupsBy based version is the correct one.

John

Tran Ma

unread,
Jun 11, 2015, 8:59:07 PM6/11/15
to haskel...@googlegroups.com
Hi Jacob,

On Friday, 12 June 2015 06:19:21 UTC+10, Jacob Stanley wrote:
Thanks for the tips Tran and John, this is what I came up with:

foldValues :: (Monad m, Eq k) => (v -> v -> v) -> Producer (k, v) m r -> Producer (k, v) m r
foldValues append xs =
    P.concat <-< folds step Nothing id (view (groupsBy keyEq) xs)
  where
    keyEq (k, _) (k', _) = k == k'

    step (Nothing)      (k, v) = Just (k, v)
    step (Just (_, v0)) (k, v) = Just (k, v0 `append` v)

Is there any way I can get this to have the type `Pipe (k, v) (k, v) m r`?
 
I don't think it is possible to both retain the same semantics (emitting only final folded values) and have the `Pipe` signature, because `Pipe` is a pull-based stream. Even if you layer some sort of state in the monad underneath the `Pipe`, it needs to do some buffering before emitting the final value.


Ideally I would like to be able to apply this operation to a consumer as well.

I'm not sure what you mean, could you give an example?

Michael Thompson

unread,
Jun 11, 2015, 10:36:13 PM6/11/15
to haskel...@googlegroups.com
I meant to agree that the pipes-group method was the true path, but I think you can always produce this sort of effect if you make something like the io-stream rule explicit, it's just a little tiresome. My impression was that there was no way to support it systematically without making a mess of everything, but see e.g.


    import Pipes
    import qualified Pipes.Prelude as P
    import Pipes.Group
    import Lens.Simple
    import Control.Monad

    foldValues :: (Monad m, Eq k) => (v -> v -> v) -> Producer (k, v) m r -> Producer (k, v) m r
    foldValues append xs = P.concat <-< folds step Nothing id (view (groupsBy keyEq) xs)
      where
      keyEq (k, _) (k', _) = k == k'

      step (Nothing)      (k, v) = Just (k, v)
      step (Just (_, v0)) (k, v) = Just (k, v0 `append` v)

    pipeFoldValues :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (k,v) (k, v) m r
    pipeFoldValues append  = go Nothing where

      go (Just (k,v)) = do 
        a <- await
        case a of 
          (k',v') | k == k' -> go (Just (k,append v v'))
          (k',v') -> do 
            yield (k,v)
            go (Just (k',v'))

      go Nothing = do 
        a <- await
        go (Just a)
 
    pipeFoldValues' :: (Monad m, Eq k) => (v -> v -> v) -> Pipe (Maybe (k,v)) (Maybe (k, v)) m r
    pipeFoldValues' append  = go Nothing where

      go (Just (k,v)) = do 
        a <- await
        case a of 
          Nothing -> yield (Just (k,v)) >> forever (yield Nothing)
          Just (k',v') | k == k' -> go (Just (k,append v v'))
          Just (k',v') -> do 
            yield (Just (k,v)) 
            go (Just (k',v'))

      go Nothing = do 
        a <- await
        go a

    pipeFoldValues'' 
       :: (Monad m, Eq k) 
       => (v -> v -> v) -> Pipe (Either r (k,v)) (Either r (k, v)) m x
    pipeFoldValues'' append  = go Nothing where

      go (Just (k,v)) = do 
        e <- await
        case e of 
          Left r -> yield (Right (k,v)) >> forever (yield (Left r))
          Right (k',v') | k == k' -> go (Just (k,append v v'))
          Right (k',v') -> do 
            yield (Right (k,v)) 
            go (Just (k',v'))

      go Nothing = do 
        a <- await
        case a of
          Left r -> forever (yield (Left r))  
          Right (k,v) -> go (Just (k,v))
    
    -- dubious kit for Nothing or Left ending streams:

    eof p = do 
      p >-> P.map Just
      forever $ yield Nothing 

    uneof = do
      n <- await
      case n of 
        Nothing -> return ()
        Just a -> yield a >> uneof

    internalize p = do
      r <- p >-> P.map Right
      forever $ yield (Left r)
  
    externalize = do
      e <- await
      case e of 
        Left r -> return r
        Right a -> yield a >> externalize
    
    -- demo --

    q = each (zip ["hi","hi","ho","ho","hi","hi"] (repeat 2))

    run p = runEffect $ p >-> P.print

    p1 = foldValues (+) q
    p2 = q >-> pipeFoldValues (+)
    p3 = eof q >-> pipeFoldValues' (+) >-> uneof
    p4 = internalize q  >-> pipeFoldValues'' (+) >-> externalize
    p4' = internalize (q >> return "hi") >-> (P.tee P.print) >-> pipeFoldValues'' (+) >-> externalize

    -- [*Main]
    -- > run p1
    -- ("hi",4)
    -- ("ho",4)
    -- ("hi",4)
    -- [*Main]
    -- > run p2
    -- ("hi",4)
    -- ("ho",4) -- <- missing last group
    -- [*Main]
    -- > run p3
    -- ("hi",4)
    -- ("ho",4)
    -- ("hi",4)
    -- [*Main]
    -- > run p4
    -- ("hi",4)
    -- ("ho",4)
    -- ("hi",4)
    -- [*Main]
    -- > run p4'
    -- Right ("hi",2)
    -- Right ("hi",2)
    -- Right ("ho",2)
    -- ("hi",4)
    -- Right ("ho",2)
    -- Right ("hi",2)
    -- ("ho",4)
    -- Right ("hi",2)
    -- Left "hi" 
    -- ("hi",4)
    -- "hi"



Jacob Stanley

unread,
Jun 12, 2015, 1:40:20 AM6/12/15
to haskel...@googlegroups.com
Hi Tran

So with the foldValues implementation below, I need to have the the Producer in order to apply the transformation.

But what if I only have a Consumer?

Can I transform a Consumer in the same, but opposite, way? That is, take a Consumer which expects folded (k,v) pairs and create a consumer which expects groups of (k,v) pairs which haven't been folded yet.

Jacob Stanley

unread,
Jun 12, 2015, 1:58:22 AM6/12/15
to haskel...@googlegroups.com
Thanks, so it looks like the only way to write this as a Pipe is to explicitly use the io-streams rule.

Gabriel Gonzalez

unread,
Jun 13, 2015, 8:18:54 AM6/13/15
to haskel...@googlegroups.com
Sorry for the delayed reply. The original pipes-group solution with this type is the correct one:


Producer (k, v) m r -> Producer (k, v) m r

There is no way to transform a Consumer to pre-group values coming in (without using something like Maybe). That is why you cannot make this into a Pipe. An easy way to convince yourself of this is to just ask what would your consumer ("c") do if you connected it to:

yield (k, v) >-> c
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Alexey Raga

unread,
Aug 11, 2015, 7:45:53 AM8/11/15
to Haskell Pipes
Assuming the stream is sorted by key, does it sound like this?

-- import Pipes.Group as G
-- import Lens.Family

foldGroups :: (Eq a, Monad m) => Producer a m r -> Producer a m r
foldGroups = G.folds (\s (_, a) -> merge s a) z id . view (G.groupsBy sameKey)

where

sameKey :: a -> a -> Bool
merge :: s -> a -> s

Cheers,
Alexey.
Reply all
Reply to author
Forward
0 new messages