I am new to concurrency in Haskell and I am having trouble
implementing the notion of interrupting a thread.
In a new thread, call it waitNotify, I am trying to do the following:
pop a number from a stack, wait some number of seconds based on the
number popped from the stack, perform some notification, and repeat
until there are no more numbers in the stack at which point we wait
for a new number.
These numbers will be supplied interactively by the user from main.
When the user supplies a new number, I want to interrupt whatever
waiting is happening in waitNotify, insert the number in the proper
position in the current stack, and resume waitNotify using the updated
stack. Note, here "stack" is just a generalization; it will likely
just be a list.
What is the most idiomatic way to capture this sort of behavior in
Haskell? My two challenges are the notion of interrupting a thread,
and sharing and updating this stack between threads (main and
waitNotify).
Thank you
_______________________________________________
Beginners mailing list
Begi...@haskell.org
http://www.haskell.org/mailman/listinfo/beginners
1. Hmm. IMHO - maybe you should look on CHP. As far as I remeber
writing/reading to channel there is blocking.
2. You mean stack (FILO) or queue (FIFO)?
3. Why don't just read lazily from input?
Regards
I have *not* tried compiling this code.
import Control.Concurrent
import Control.Concurrent.MVar
import Data.Int
import System.Time
type Microseconds = Int64
getSystemTime :: IO Microseconds
getSystemTime = do
(TOD sec pico) <- getClockTime
return $!
(fromIntegral sec::Int64) * 1000000 +
(fromIntegral pico::Int64) `div` 1000000
type Stack a = [a] -- or whatever type you want
isEmpty :: Stack a -> Bool
isEmpty [] = True
isEmpty _ = False
pop :: Stack a -> (a, Stack a)
data ScheduleInput = ModifyStack (Stack -> Stack) | WaitFor Microseconds
| Timeout
never = maxBound :: Microseconds
schedule :: MVar ScheduleInput -> MVar a -> Stack a -> IO ()
schedule inpVar wnVar stack = schedule_ never stack
where
schedule_ :: Microseconds -> Stack -> IO ()
schedule_ timeout stack = do
now <- getSystemTime
let tillTimeout = 0 `max` (timeout - now)
if tillTimeout == 0 && not (isEmpty stack) then do
let (val, stack') = pop stack
putMVar wnVar (PopValue val)
schedule never stack'
else do
inp <- takeMVarWithTimeout (fromIntegral tillTimeout) inpVar
case inp of
ModifyStack f -> schedule_ timeout (f stack)
WaitFor t -> do
now <- getSystemTime
schedule (t+now) stack
Timeout -> schedule timeout stack
readMVarWithTimeout :: Int -> MVar ScheduleInput -> IO ScheduleInput
readMVar timeoutUS inpVar = do
tid <- forkIO $ do
threadDelay timeoutUS
putMVar inpVar Timeout
inp <- takeMVar inpVar
killThread tid
return inp
waitNotify :: MVar ScheduleInput -> MVar Int -> IO ()
waitNotify schInp wnInp = do
val <- takeMVar wnInp
...notify...
let t = ....
putMVar schInp $ WaitFor t -- block input for the specified period
main = do
schVar <- newEmptyMVar
wnVar <- newEmptyMVar
forkIO $ schedule schVar wnVar []
forkIO $ waitNotify wnVar schVar
...
-- Modify stack according to user input inside your main IO loop
putMVar schVar $ ModifyStack $ \stack -> ...
I'm sure this is not exactly what you want, but at least it illustrates
how you can achieve anything you like by using MVars + extra worker
threads + killing threads (useful for implementing timeouts).
Steve
Thanks Stephen. That worked and things are starting to make more
sense. The crux of it all was the readMVarWithTimeout function which
does exactly what I want.
Now I want to extend this a bit further to use StateT. I've modified
the schedule function quite a bit, and now it has type schedule ::
StateT MyState IO. I keep the MVar actVar in MyState as well the list
(stack) of wait times. However, the type of forkIO is forkIO :: IO ()
-> IO ThreadId. How do I fork a new thread for schedule even though
it is wrapped in the State monad?
Thanks again.
You'll want something like this:
myFork :: StateT MyState IO () -> MyState -> StateT MyState IO ThreadId
myFork action initialState = liftIO (forkIO (evalStateT action initialState))
Dean
That's not quite what I'm looking for. I don't want to create a new
thread in which to run the monad but rather to be able to create
threads from inside the monad (as a result of running the monad).
And that's what this code does. Do you want it to automatically use
the state of the monad ?
> myFork :: StateT MyState IO () -> MyState -> StateT MyState IO ThreadId
> myFork action = liftIO . forkIO . evalStateT action =<< get
keep in mind that nothing the thread will do will affect the state of
the main thread.
--
Jedaï