Getting to grips with Pipes.Concurrent

Showing 1-12 of 12 messages
Getting to grips with Pipes.Concurrent Nick Partridge 7/6/14 11:59 PM
Hi, I've been playing a bit with Pipes.Concurrent lately, and I seem to be stumbling into STM errors more than I'd like. Here's a recent example:

https://gist.github.com/nkpart/5c83011b10b33ebc5e35

The function takes a number of workers to spawn, a generator of work and the function to run on each worker thread. What I don't understand is why I needed to explicitly seal the output channels in this example.  Is it something to do with the fact I have 2 sets of channels in play? The examples in the docs don't illustrate anything like this and that seems to me to be the only real point of difference between this and what works.

Also in general, I find that I'm frequently able to get bits and pieces to compile, look okay, and maybe even run correctly some percentage of the time, and then blow up with STM exceptions occasionally as well. Usually I've missed the performGC, but the experience is disturbing to my Haskell brain.

Thanks,
Nick
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Gabriel Gonzalez 7/7/14 8:42 AM

You shouldn't ever get these errors, even if you forget performGC.  I've heard from Michael Xavier that he had this issue when he switched to ghc-7.8.  Are you by chance also using ghc-7.8?

Either way this is a bug since it violates the library's contract.  If you can throw together a minimal example that reproduces the error as a Github issue then I can dig into this.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Michael Thompson 7/7/14 2:20 PM
I'm not at all following this yet, but ghc-7.8 does seem to be involved. It seems that a nearly minimal example is already present in the tutorial -- presumably `work.hs` printed at the end of it used to work fine, but with ghc-7.8 I get: 

    1
    2
    3
    ... etc ..
    99
    100
    101
    102
    103
    104
    hi
    Worker #2: Processed 2
    Worker #1: Processed 1
    Worker #3: Processed 3
    105
    106
    107
    hoWorker #1: Processed 5
    Worker #2: Processed 4
    Worker #3: Processed 6
    work: thread blocked indefinitely in an STM transaction

(here I typed input, but the result is the same if I do nothing.)

By contrast the simpler `main` in the corresponding bit of the tutorial text itself (i.e. https://github.com/Gabriel439/Haskell-Pipes-Concurrency-Library/blob/master/src/Pipes/Concurrent/Tutorial.hs#L220 ) behaves as is advertised in the demo that follows in the text. I will see if I can extract the salient difference.



Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Mark Wotton 7/7/14 3:48 PM
I can confirm that I'm seeing this behaviour pretty frequently on 7.8.2.

Sent from my iPhone
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Michael Thompson 7/7/14 3:53 PM
The bit about user input was nonsense; the example wasn't using the `user` it defined.  Here is a more minimal main (with the same `worker`):

    main = do
        (output, input) <- spawn (Bounded 1) 
        a1  <- async $ runEffect $ fromInput input  >-> worker 1
        a2  <- async $ runEffect $ yield 1 >->  toOutput output
        wait a1
        wait a2 

      $ ./work
     Worker #1: Processed 1
     work: thread blocked indefinitely in an STM transaction

It works if a higher `Bounded` or if `Unbounded` is chosen; it works if the second `wait` is dropped, etc.
The most striking difference was the use of `Unbounded` in the good version and `Bounded` in the separate module.



Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Andrew Cowie 7/7/14 11:20 PM
On Mon, 2014-07-07 at 08:42 -0700, Gabriel Gonzalez wrote:
> You shouldn't ever get these errors, even if you forget performGC.
> I've heard from Michael Xavier that he had this issue when he switched
> to ghc-7.8.  Are you by chance also using ghc-7.8?

I'm not about to say that pipes-concurrent is the problem, but we have a
program where we used it and are seeing the

        "thread blocked indefinitely in an STM transaction"

error message whenever multiple concurrent requests come in. We've only
just started debugging it, but we are indeed on GHC 7.8.2

AfC
Sydney



Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Gabriel Gonzalez 7/8/14 7:17 AM

I'm pretty sure `pipes-concurrent` is the problem because it is doing unsafe stuff under the hood (using `unsafeIOToSTM` to touch a weak `IORef`).

Fortunately, there is a safe alternative now, which is the newly added `mkWeakTVar` in the `stm` package.

I will fix this on the weekend but if somebody has time before then the above solution is the first thing I would try.

--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Gabriel Gonzalez 7/13/14 12:04 PM
Alright, so even when I switched to the safe `mkWeakTVar` I still see the exact same issue.  I've also reduced the minimal reproducing program to:

    import Pipes.Concurrent
   
    main = do
        (_, input) <- spawn Unbounded
        atomically $ recv input

It seems like something has fundamentally changed about how weak references interact with STM transactions in ghc-7.8.  Right now I am trying to see if there is some other way to accomplish the same behavior without weak references.
--
You received this message because you are subscribed to the Google Groups "Haskell Pipes" group.
To unsubscribe from this group and stop receiving emails from it, send an email to haskell-pipe...@googlegroups.com.
To post to this group, send email to haskel...@googlegroups.com.

Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Nick Partridge 7/14/14 12:09 AM
Thanks Gabriel. I'm on Max OS X 10.9, and I just reproduced the minimal case using GHC 7.8.3. I'm not sure I can usefully help you out with any code, but let me know if you want some some testing done.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Tony Day 8/16/14 2:14 PM
Any progress on this Gabe?

I was thinking of diving in and exploring weak references if no immediate solution has presented itself.

Knowing absolutely zero about them, to what extent could I apply equational reasoning to the minimum code, given the effects in there?

I took the liberty of adding this as an open issue on github.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Gabriel Gonzalez 8/16/14 2:21 PM
The only progress I made was to start this thread on the ghc-users mailing list:

http://www.haskell.org/pipermail/glasgow-haskell-users/2014-July/025107.html

The #1 issue I have is that it's not clear what changed in the transition from ghc-7.6 to ghc-7.8 that caused this problem.  I think if we understood that we can better understand the potential solutions.

The worst case scenario (if the problem is can't be resolved) is that I just change the type of `spawn` to:

    spawn :: Buffer a -> Managed (Output a, Input a, STM ())

... where that will automatically `seal` the action when done.  It's not as useful, but at least it will avoid the exception.
Re: [haskell-pipes] Getting to grips with Pipes.Concurrent Tony Day 8/16/14 7:24 PM
Digging into the guts of spawn' and removing the pipes-concurrency shell, I boiled the behaviour change down to this:

    import Control.Concurrent.STM
    import Data.IORef

    main :: IO ()
    main = do
        sealed <- atomically $ newTVar False
        rRecv <- newIORef ()
        mkWeakIORef rRecv (atomically $ writeTVar sealed True)
        atomically $ do
            b <- readTVar sealed
            check b
            return ()

So definitely a weak IO reference/TVar interaction effect (and nothing to do with unsafePerformIO)