Getting to grips with Pipes.Concurrent

251 views
Skip to first unread message

Nick Partridge

unread,
Jul 7, 2014, 2:59:17 AM7/7/14
to haskel...@googlegroups.com
Hi, I've been playing a bit with Pipes.Concurrent lately, and I seem to be stumbling into STM errors more than I'd like. Here's a recent example:

https://gist.github.com/nkpart/5c83011b10b33ebc5e35

The function takes a number of workers to spawn, a generator of work and the function to run on each worker thread. What I don't understand is why I needed to explicitly seal the output channels in this example.  Is it something to do with the fact I have 2 sets of channels in play? The examples in the docs don't illustrate anything like this and that seems to me to be the only real point of difference between this and what works.

Also in general, I find that I'm frequently able to get bits and pieces to compile, look okay, and maybe even run correctly some percentage of the time, and then blow up with STM exceptions occasionally as well. Usually I've missed the performGC, but the experience is disturbing to my Haskell brain.

Thanks,
Nick

Gabriel Gonzalez

unread,
Jul 7, 2014, 11:42:25 AM7/7/14
to haskell-pipes, nkp...@gmail.com

You shouldn't ever get these errors, even if you forget performGC.  I've heard from Michael Xavier that he had this issue when he switched to ghc-7.8.  Are you by chance also using ghc-7.8?

Either way this is a bug since it violates the library's contract.  If you can throw together a minimal example that reproduces the error as a Github issue then I can dig into this.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Michael Thompson

unread,
Jul 7, 2014, 5:20:16 PM7/7/14
to haskel...@googlegroups.com, nkp...@gmail.com
I'm not at all following this yet, but ghc-7.8 does seem to be involved. It seems that a nearly minimal example is already present in the tutorial -- presumably `work.hs` printed at the end of it used to work fine, but with ghc-7.8 I get: 

    1
    2
    3
    ... etc ..
    99
    100
    101
    102
    103
    104
    hi
    Worker #2: Processed 2
    Worker #1: Processed 1
    Worker #3: Processed 3
    105
    106
    107
    hoWorker #1: Processed 5
    Worker #2: Processed 4
    Worker #3: Processed 6
    work: thread blocked indefinitely in an STM transaction

(here I typed input, but the result is the same if I do nothing.)

By contrast the simpler `main` in the corresponding bit of the tutorial text itself (i.e. https://github.com/Gabriel439/Haskell-Pipes-Concurrency-Library/blob/master/src/Pipes/Concurrent/Tutorial.hs#L220 ) behaves as is advertised in the demo that follows in the text. I will see if I can extract the salient difference.



Mark Wotton

unread,
Jul 7, 2014, 6:48:03 PM7/7/14
to haskel...@googlegroups.com
I can confirm that I'm seeing this behaviour pretty frequently on 7.8.2.

Sent from my iPhone
--

Michael Thompson

unread,
Jul 7, 2014, 6:53:15 PM7/7/14
to haskel...@googlegroups.com, nkp...@gmail.com
The bit about user input was nonsense; the example wasn't using the `user` it defined.  Here is a more minimal main (with the same `worker`):

    main = do
        (output, input) <- spawn (Bounded 1) 
        a1  <- async $ runEffect $ fromInput input  >-> worker 1
        a2  <- async $ runEffect $ yield 1 >->  toOutput output
        wait a1
        wait a2 

      $ ./work
     Worker #1: Processed 1
     work: thread blocked indefinitely in an STM transaction

It works if a higher `Bounded` or if `Unbounded` is chosen; it works if the second `wait` is dropped, etc.
The most striking difference was the use of `Unbounded` in the good version and `Bounded` in the separate module.



Andrew Cowie

unread,
Jul 8, 2014, 2:20:13 AM7/8/14
to haskel...@googlegroups.com
On Mon, 2014-07-07 at 08:42 -0700, Gabriel Gonzalez wrote:
> You shouldn't ever get these errors, even if you forget performGC.
> I've heard from Michael Xavier that he had this issue when he switched
> to ghc-7.8. Are you by chance also using ghc-7.8?

I'm not about to say that pipes-concurrent is the problem, but we have a
program where we used it and are seeing the

"thread blocked indefinitely in an STM transaction"

error message whenever multiple concurrent requests come in. We've only
just started debugging it, but we are indeed on GHC 7.8.2

AfC
Sydney



Gabriel Gonzalez

unread,
Jul 8, 2014, 10:17:16 AM7/8/14
to haskell-pipes

I'm pretty sure `pipes-concurrent` is the problem because it is doing unsafe stuff under the hood (using `unsafeIOToSTM` to touch a weak `IORef`).

Fortunately, there is a safe alternative now, which is the newly added `mkWeakTVar` in the `stm` package.

I will fix this on the weekend but if somebody has time before then the above solution is the first thing I would try.

Gabriel Gonzalez

unread,
Jul 13, 2014, 3:04:48 PM7/13/14
to haskel...@googlegroups.com, nkp...@gmail.com, Michael Thompson
Alright, so even when I switched to the safe `mkWeakTVar` I still see the exact same issue.  I've also reduced the minimal reproducing program to:

    import Pipes.Concurrent
   
    main = do
        (_, input) <- spawn Unbounded
        atomically $ recv input

It seems like something has fundamentally changed about how weak references interact with STM transactions in ghc-7.8.  Right now I am trying to see if there is some other way to accomplish the same behavior without weak references.

Nick Partridge

unread,
Jul 14, 2014, 3:09:51 AM7/14/14
to haskel...@googlegroups.com, nkp...@gmail.com, practica...@gmail.com
Thanks Gabriel. I'm on Max OS X 10.9, and I just reproduced the minimal case using GHC 7.8.3. I'm not sure I can usefully help you out with any code, but let me know if you want some some testing done.

Tony Day

unread,
Aug 16, 2014, 5:14:44 PM8/16/14
to haskel...@googlegroups.com, nkp...@gmail.com, practica...@gmail.com
Any progress on this Gabe?

I was thinking of diving in and exploring weak references if no immediate solution has presented itself.

Knowing absolutely zero about them, to what extent could I apply equational reasoning to the minimum code, given the effects in there?

I took the liberty of adding this as an open issue on github.

Gabriel Gonzalez

unread,
Aug 16, 2014, 5:21:03 PM8/16/14
to haskel...@googlegroups.com, nkp...@gmail.com, practica...@gmail.com
The only progress I made was to start this thread on the ghc-users mailing list:

http://www.haskell.org/pipermail/glasgow-haskell-users/2014-July/025107.html

The #1 issue I have is that it's not clear what changed in the transition from ghc-7.6 to ghc-7.8 that caused this problem.  I think if we understood that we can better understand the potential solutions.

The worst case scenario (if the problem is can't be resolved) is that I just change the type of `spawn` to:

    spawn :: Buffer a -> Managed (Output a, Input a, STM ())

... where that will automatically `seal` the action when done.  It's not as useful, but at least it will avoid the exception.

Tony Day

unread,
Aug 16, 2014, 10:24:49 PM8/16/14
to haskel...@googlegroups.com, nkp...@gmail.com, practica...@gmail.com
Digging into the guts of spawn' and removing the pipes-concurrency shell, I boiled the behaviour change down to this:

    import Control.Concurrent.STM
    import Data.IORef

    main :: IO ()
    main = do
        sealed <- atomically $ newTVar False
        rRecv <- newIORef ()
        mkWeakIORef rRecv (atomically $ writeTVar sealed True)
        atomically $ do
            b <- readTVar sealed
            check b
            return ()

So definitely a weak IO reference/TVar interaction effect (and nothing to do with unsafePerformIO)
Reply all
Reply to author
Forward
0 new messages