|Getting to grips with Pipes.Concurrent||Nick Partridge||7/6/14 11:59 PM|
Hi, I've been playing a bit with Pipes.Concurrent lately, and I seem to be stumbling into STM errors more than I'd like. Here's a recent example:
The function takes a number of workers to spawn, a generator of work and the function to run on each worker thread. What I don't understand is why I needed to explicitly seal the output channels in this example. Is it something to do with the fact I have 2 sets of channels in play? The examples in the docs don't illustrate anything like this and that seems to me to be the only real point of difference between this and what works.
Also in general, I find that I'm frequently able to get bits and pieces to compile, look okay, and maybe even run correctly some percentage of the time, and then blow up with STM exceptions occasionally as well. Usually I've missed the performGC, but the experience is disturbing to my Haskell brain.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Gabriel Gonzalez||7/7/14 8:42 AM|
You shouldn't ever get these errors, even if you forget performGC. I've heard from Michael Xavier that he had this issue when he switched to ghc-7.8. Are you by chance also using ghc-7.8?
Either way this is a bug since it violates the library's contract. If you can throw together a minimal example that reproduces the error as a Github issue then I can dig into this.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Michael Thompson||7/7/14 2:20 PM|
I'm not at all following this yet, but ghc-7.8 does seem to be involved. It seems that a nearly minimal example is already present in the tutorial -- presumably `work.hs` printed at the end of it used to work fine, but with ghc-7.8 I get:
... etc ..
Worker #2: Processed 2
Worker #1: Processed 1
Worker #3: Processed 3
hoWorker #1: Processed 5
Worker #2: Processed 4
Worker #3: Processed 6
work: thread blocked indefinitely in an STM transaction
(here I typed input, but the result is the same if I do nothing.)
By contrast the simpler `main` in the corresponding bit of the tutorial text itself (i.e. https://github.com/Gabriel439/Haskell-Pipes-Concurrency-Library/blob/master/src/Pipes/Concurrent/Tutorial.hs#L220 ) behaves as is advertised in the demo that follows in the text. I will see if I can extract the salient difference.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Mark Wotton||7/7/14 3:48 PM|
I can confirm that I'm seeing this behaviour pretty frequently on 7.8.2.
Sent from my iPhone
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Michael Thompson||7/7/14 3:53 PM|
The bit about user input was nonsense; the example wasn't using the `user` it defined. Here is a more minimal main (with the same `worker`):
main = do
(output, input) <- spawn (Bounded 1)
a1 <- async $ runEffect $ fromInput input >-> worker 1
a2 <- async $ runEffect $ yield 1 >-> toOutput output
Worker #1: Processed 1
It works if a higher `Bounded` or if `Unbounded` is chosen; it works if the second `wait` is dropped, etc.
The most striking difference was the use of `Unbounded` in the good version and `Bounded` in the separate module.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Andrew Cowie||7/7/14 11:20 PM|
On Mon, 2014-07-07 at 08:42 -0700, Gabriel Gonzalez wrote:I'm not about to say that pipes-concurrent is the problem, but we have a
program where we used it and are seeing the
error message whenever multiple concurrent requests come in. We've only
just started debugging it, but we are indeed on GHC 7.8.2
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Gabriel Gonzalez||7/8/14 7:17 AM|
I'm pretty sure `pipes-concurrent` is the problem because it is doing unsafe stuff under the hood (using `unsafeIOToSTM` to touch a weak `IORef`).
Fortunately, there is a safe alternative now, which is the newly added `mkWeakTVar` in the `stm` package.
I will fix this on the weekend but if somebody has time before then the above solution is the first thing I would try.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Gabriel Gonzalez||7/13/14 12:04 PM|
Alright, so even when I switched to the safe `mkWeakTVar` I still see the exact same issue. I've also reduced the minimal reproducing program to:
main = do
(_, input) <- spawn Unbounded
atomically $ recv input
It seems like something has fundamentally changed about how weak references interact with STM transactions in ghc-7.8. Right now I am trying to see if there is some other way to accomplish the same behavior without weak references.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Nick Partridge||7/14/14 12:09 AM|
Thanks Gabriel. I'm on Max OS X 10.9, and I just reproduced the minimal case using GHC 7.8.3. I'm not sure I can usefully help you out with any code, but let me know if you want some some testing done.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Tony Day||8/16/14 2:14 PM|
Any progress on this Gabe?
I was thinking of diving in and exploring weak references if no immediate solution has presented itself.
Knowing absolutely zero about them, to what extent could I apply equational reasoning to the minimum code, given the effects in there?
I took the liberty of adding this as an open issue on github.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Gabriel Gonzalez||8/16/14 2:21 PM|
The only progress I made was to start this thread on the ghc-users mailing list:
The #1 issue I have is that it's not clear what changed in the transition from ghc-7.6 to ghc-7.8 that caused this problem. I think if we understood that we can better understand the potential solutions.
The worst case scenario (if the problem is can't be resolved) is that I just change the type of `spawn` to:
spawn :: Buffer a -> Managed (Output a, Input a, STM ())
... where that will automatically `seal` the action when done. It's not as useful, but at least it will avoid the exception.
|Re: [haskell-pipes] Getting to grips with Pipes.Concurrent||Tony Day||8/16/14 7:24 PM|
Digging into the guts of spawn' and removing the pipes-concurrency shell, I boiled the behaviour change down to this:
main :: IO ()
main = do
sealed <- atomically $ newTVar False
rRecv <- newIORef ()
mkWeakIORef rRecv (atomically $ writeTVar sealed True)
atomically $ do
b <- readTVar sealed
So definitely a weak IO reference/TVar interaction effect (and nothing to do with unsafePerformIO)