pipes-concurrent - dispatching

52 views
Skip to first unread message

Bartosz Przygoda

unread,
Oct 20, 2014, 4:23:31 AM10/20/14
to haskel...@googlegroups.com
Hello,

Tutorial shows that if multiple pipes are reading from same messagebox, the work is divided amongst all of them. However, is there a way to decide what value should be read by which pipe?

Let's say that the mailbox contains messages with values paired with some kind of 'id', and I would like each concurrent effect to handle only values destined for him, basing on said 'id'. Is is doable, or is it entirely wrong approach?

Alp Mestanogullari

unread,
Oct 20, 2014, 8:03:05 AM10/20/14
to haskel...@googlegroups.com
Hi Bartosz,

Another idea would be to have one mailbox per "handler" so that they receive only the values they're supposed to handle and nothing more.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.



--
Alp Mestanogullari

Bartosz Przygoda

unread,
Oct 20, 2014, 9:01:52 AM10/20/14
to haskel...@googlegroups.com
Thought of that - but wouldn't my 'dispatcher' have to fork for each such mailbox which is then handled by as many forked threads resulting in doubled number of threads for each such concurrent activity?

--
You received this message because you are subscribed to a topic in the Google Groups "Haskell Pipes" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/haskell-pipes/BVvRdsG9aIk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to haskell-pipe...@googlegroups.com.

Gabriel Gonzalez

unread,
Oct 20, 2014, 9:56:56 AM10/20/14
to haskel...@googlegroups.com, bprz...@gmail.com
This should be possible.  Remember that the read action is an STM action, so you should be able to just write:

    recvOnly :: Int -> Input (Int, a) -> STM (Maybe a)
    recvOnly n i = do
        x <- recv i
        case x of
            Nothing                 -> return Nothing
            Just (n, a) | n == i    -> return (Just a)
                        | otherwise -> retry

Then just convert that `recvOnly` action into a `Producer` by looping the same way that `fromInput` does.

Alp Mestanogullari

unread,
Oct 20, 2014, 10:20:04 AM10/20/14
to haskel...@googlegroups.com
The "dispatcher" could just look at the "id" and then `send` the associated value to the appropriate mailbox. E.g:

-- suppose we have a function that gives us a mailbox from an ID
-- in your program you could just store them in some datastructure
-- and just look them up, instead of having 'mailboxForId'
mailboxForId :: Int -> Output a

dispatcher :: Consumer (Int, a) IO ()
dispatcher = loop

  where loop = do
          (id, value) <- await
          alive <- liftIO . atomically $
            send (mailboxForId id) value
          when alive loop

Gabriel's suggestion would work better though if there's a lot of these workers, I think.
  

Bartosz Przygoda

unread,
Oct 20, 2014, 10:35:50 AM10/20/14
to haskel...@googlegroups.com
Yes, what I missed is the 'send' function. Thanks a lot.

@Gabriel - thanks for the hint, I am still missing basic information on the internals (just started with it), now I know where to look.
Reply all
Reply to author
Forward
0 new messages