Agents for managing threads

71 views
Skip to first unread message

Garth Sheldon-Coulson

unread,
Oct 8, 2009, 12:32:17 PM10/8/09
to clo...@googlegroups.com
Hi All,

This is a question about whether I can use agents or some other Clojure concurrency feature to manage a multithreaded process elegantly and extensibly. The following is a thought I had for how to do it, but I would appreciate additional suggestions.

I have an application that needs to poll an outside service at regular intervals---every 50ms or so. For various reasons the polling takes place in a separate thread. I need to be able to send messages to the separate thread (call it the "polling thread") from the other threads in the application. For instance, I need to be able to tell it to go to sleep (stop polling) and wake up (restart polling), and I need to be able to give it new instructions on how to process items it reads from the outside service.

Currently, I send messages to the thread by conjing them to a ref that the thread knows about, specifically a ref that the Thread's run function is closed over. I then send the thread an interrupt, which lets it know to look in the ref to see if there's anything new and interesting to do.

In practice, this means sending messages to the thread goes something like:

(dosync (commute message-queue conj message-item))
(.interrupt polling-thread)

There's nothing wrong with this, but I'd like to know if I could use agents instead. I want to be able to abstract away from the interrupt, even if the abstraction uses an interrupt internally.

Could I create an agent containing the thread object?

(def poll-agent (agent (Thread. f)))

And send it messages like this:

(swap! message-queue add-item item)
(send poll-agent check-queue)

Where check-queue is :

(defn check-queue [t] (io! (.interrupt t)))

Are side-effects such as an interrupt permissible in functions sent to agents? That the io! doesn't throw an error indicates yes, but please correct me if I'm wrong.

Is wrapping a thread object in an agent a reasonable thing to do?

Are there better ways to handle this situation? Is there any way the queue could be added to, and the thread interrupted, in a single call? I don't think I can include both the queue and the thread in a single agent and send the queue conj and the interrupt in a single send call, because I need to be guaranteed that the conj takes place before the interrupt...

But ideally I'd like something like:

(send poll-agent add-item item)

that would take care of both interrupting the thread (potentially waking it up) and putting the item somewhere it could find it.

Thanks.

Laurent PETIT

unread,
Oct 8, 2009, 1:06:56 PM10/8/09
to clo...@googlegroups.com
I'm not an expert on asynchronicity with clojure, but I'll benefit from the fact i'll be the first to answer :-)
But please take this with a grain of salt.

As I see agents, in their generality, they are "a generic way to handle a state asynchronously (commands from multiple non organized sources - and they don't need to be organized-)".

Maybe you could indeed consider your agent is responsible for the "state" of the polling thread : active, sleeping, and when active the polling strategy.
One thing to keep in mind is that no "order" sent to the agent will be lost, they will apply in order, one-at-a-time.

So I don't think you need this message-queue at all (or maybe I haven't understood what it is or I'm mislead by its name), send directly your order to the agent as what you want to change in its state.

(If I'm right, it may well be even better than you imagined :-) )

HTH,

--
Laurent

2009/10/8 Garth Sheldon-Coulson <ga...@mit.edu>

John Harrop

unread,
Oct 8, 2009, 7:49:53 PM10/8/09
to clo...@googlegroups.com
On Thu, Oct 8, 2009 at 1:06 PM, Laurent PETIT <lauren...@gmail.com> wrote:
So I don't think you need this message-queue at all (or maybe I haven't understood what it is or I'm mislead by its name), send directly your order to the agent as what you want to change in its state.

You might be able to do better than that, and dispense entirely with the separate polling thread.

Break the state involved in the polling into a variable and a constant part. Constant might be a pocket on a socket on a port, variable a polling/non-polling flag and some other stuff. Wrap the latter in a structure such as a list or a map and stuff it in an agent. Create some functions that close over the former, some to change the agent state and one to do the polling.

The polling one should check the are-we-polling flag of the state, return the state if we're not polling, and otherwise poll, sleep however-long, and (send-off *agent* poll), then return the state. The one to turn on polling also needs to end with (send-off *agent* poll) and then return the modified version of the state (with the we're-polling flag assoc'd true). The effect of all this should be:

If the agent is told to start polling, it gets enqueued with a poll send-off, and immediately after the last send returns and sets the new agent state, the poll function is then executed. Every however long this repeats.

If a state change is sent to a polling agent, it probably arrives during the poll function's sleep. This state change message is thus queued by the agent ahead of the poll message it sends itself at the end of the sleep. So the state change is done next, then the poll, and for that poll the state change should be in effect.

If the state change included stop-polling, then since the poll function checks the we're-polling flag and aborts if we're not polling, that next poll is a no-op; in particular, the agent does not send itself another poll message, and will be completely quiescent until sent a start-polling message.

John Harrop

unread,
Oct 8, 2009, 8:12:47 PM10/8/09
to clo...@googlegroups.com
On Thu, Oct 8, 2009 at 7:49 PM, John Harrop <jharr...@gmail.com> wrote:
You might be able to do better than that, and dispense entirely with the separate polling thread. 

Confirmed:

(def x (agent {:polling false :message "foo"}))

(defn poll [m]
  (when (:polling m)
    (prn (:message m))
    (Thread/sleep 1000)
    (send-off *agent* poll))
  m)
 
(defn set-message [m mess]
  (assoc m :message mess))

(defn start [m]
  (send-off *agent* poll)
  (assoc m :polling true))

(defn stop [m]
  (assoc m :polling false))

(send-off x start)

The message "foo" should start repeating in the standard-output monitor window of your IDE.

(send-off x set-message "bar")

The message "bar" should start repeating in the standard-output monitor window of your IDE, in place of "foo".

(send-off x stop)

The output should stop, and look like this:

"foo"
"foo"
"foo"
"foo"
"bar"
"bar"
"bar"

with possibly different numbers of repetitions. Seems to indicate that this strategy works beautifully for a periodically-repeating behavior that accepts state-change messages, including start and stop signals.

John Harrop

unread,
Oct 10, 2009, 3:54:52 AM10/10/09
to clo...@googlegroups.com
Here is a quickie "library" for abstracting this:

(defn make-actor [f period-in-ms & initial-state]
  (agent (into [f period-in-ms false] initial-state)))

(defmacro actor
  "Creates and returns a new, initially-sleeping actor with the specified period, initial parameter values, and code to execute."
  [period-in-ms initial-bindings & body]
  `(let [p# ~period-in-ms]
     (make-actor
       (fn [~@(take-nth 2 initial-bindings)] ~@body)
       p#
       ~@(take-nth 2 (rest initial-bindings)))))

(defn- actor-act [state]
  (when (nth state 2)
    (apply (first state) (drop 3 state))
    (Thread/sleep (second state))
    (send-off *agent* actor-act))
  state)

(defn- actor-start [state]
  (send-off *agent* actor-act)
  (into [(first state) (second state) true] (drop 3 state)))

(defn- actor-stop [state]
  (into [(first state) (second state) false] (drop 3 state)))

(defn- actor-change-state [state new-state]
  (into [(first state) (second state) (nth state 2)] new-state))

(defn start-actor
  "Wakes up an actor -- starts it periodically executing its body."
  [actor]
  (send-off actor actor-start))

(defn stop-actor
  "Puts an actor to sleep again."
  [actor]
  (send-off actor actor-stop))

(defn change-actor-state
  "Changes an actor's parameter list."
  [actor & new-state]
  (send-off actor actor-change-state new-state))


Test at the REPL with these:

=> (def y (actor 1000 [x 1 y 1] (println (+ (* x 10) y))))

=> (start-actor y)

11 should start repeating to stdout.

=> (change-actor-state y 2 2)

It should stop repeating 11 and start repeating 22.

=> (stop-actor y)

Output should halt.

(start-actor y)

Output should resume.

(stop-actor y)

Output should halt.

(change-actor-state y 4 7)

No immediate effect.

(start-actor y)

Output should resume, but printing 47 instead of 22.

(stop-actor y)

Output should stop.

Note that the (actor ...) macro resembles a fn form in use (though destructuring won't work). In the body, the parameters can be referred to by name to use them. When it is called, it is with an ordered list of values to bind to those parameters. The (change-actor-state ...) function is followed by the actor and then such a parameter list, so in the example above since x was the first parameter (change-actor-state y 4 7) binds 4 to x on subsequent executions of the actor body.

Under the hood, it's exactly as described: the actor body is wrapped in a fn with those parameter names and the actor is invoked periodically with an argument list, which is replaced by change-actor-state. The actor itself is an agent wrapping a vector with the function, period, awake flag, and current parameters.

Trivial additions: creating a (defactor name ...) macro that functions like (def name (actor ...)); adding a change-actor-period function.

More interesting, but fairly easy: have the function take one extra parameter, the previous invocation's return value, and specify an initial value for it in (actor ...). This enables the function to maintain a mutable cell of sorts.

Nontrivial: add destructuring.

Raoul Duke

unread,
Oct 10, 2009, 5:52:26 PM10/10/09
to clo...@googlegroups.com
> The actor itself is
> an agent wrapping a vector with the function, period, awake flag, and
> current parameters.

will actors actually do the queued function w/in a reasonable
timeframe? i don't think there are any guarantees of it so if one is
hoping to get really nicely periodic behaviour... just curious because
i'd thought of using agents for periodic stuff, too.

sincerely.

John Harrop

unread,
Oct 11, 2009, 12:39:11 AM10/11/09
to clo...@googlegroups.com
In practice, they seem to. 

Raoul Duke

unread,
Oct 11, 2009, 5:28:03 PM10/11/09
to clo...@googlegroups.com
>> will actors actually do the queued function w/in a reasonable
>> timeframe? i don't think there are any guarantees of it so if one is
>> hoping to get really nicely periodic behaviour... just curious because
>> i'd thought of using agents for periodic stuff, too.
>
> In practice, they seem to.

it seems like a common thing people try to do in Clojure e.g. random
link http://github.com/fffej/ClojureProjects/blob/33ef9f0d1c67821658122273321e0a403486374b/misc/uk/co/fatvat/flock.clj

so, can anybody "official" say if this use case is / will be
explicitly kept in mind for agent implementation?

thanks :)

John Harrop

unread,
Oct 11, 2009, 10:29:25 PM10/11/09
to clo...@googlegroups.com
Agents are for asynchronous, sequential state changes. Which means there is a guarantee that a change to an actor will be processed before its next action (but after its current one). There's no guarantee of promptness of getting to any particular message, but the evidence is that it's on a best-effort basis. The system's semantics won't change, then, but it may get slow if the system is heavily loaded (CPU, I/O contention, RAM/pagefile, whatever is relevant) or the actor tasks are long-running.

If you want one with the ability to interrupt/abort the current task you'll need to roll your own using threads and interrupt (I wouldn't recommend interrupting an agent thread, even if the agent task catches InterruptedException and returns; if the interrupt hits at the wrong moment between one job's catch and the next's try, you'll get at best agent errors (an InterruptedException in the error queue) and at worst a crashed agent thread. (Finding the thread an agent is using is possible but nontrivial; the same agent may be using a different thread on successive sends, so it's really the agent task and not the agent that has a thread. A long-running task could start with a reset! of an atom to (Thread/currentThread) to give the code that launched it a reference to the Thread object via which to interrupt it. Interrupting it after the task is complete may interrupt some OTHER agent's task, never mind some other task given to the same agent.)

Garth Sheldon-Coulson

unread,
Oct 12, 2009, 2:05:45 AM10/12/09
to clo...@googlegroups.com
Hi All,

Thanks for the great replies. John, the self-send-off idea is terrific and hadn't occurred to me. I'll be using a variant of what you proposed.

Garth

John Harrop

unread,
Oct 12, 2009, 5:53:28 PM10/12/09
to clo...@googlegroups.com
Thanks.

Robert Stehwien

unread,
Oct 13, 2009, 7:24:26 AM10/13/09
to clo...@googlegroups.com
John,

Excellent "library"  I'm pulling this into my utilities functions ... with proper attribution of course and as long as you don't mind.  It is just too useful to me to not keep around.

--Robert

John Harrop

unread,
Oct 13, 2009, 10:50:55 AM10/13/09
to clo...@googlegroups.com
On Tue, Oct 13, 2009 at 7:24 AM, Robert Stehwien <rste...@gmail.com> wrote:
John,

Excellent "library"  I'm pulling this into my utilities functions ... with proper attribution of course and as long as you don't mind.  It is just too useful to me to not keep around.

Thanks. Consider any code I post here to be public domain. Too many licensing and copyright headaches with programming already; why should I contribute to them? :)

Note that I posted an immutable priority queue implementation recently as well, along with a variation on the theme of actors, this one somewhat different in semantics but capable of parallelization with a priority-supersede -- something like having a form of agent send that priority queues jobs instead of simply FIFO queueing them (but which doesn't, though it could be added, have an analogue of agents' states).
Reply all
Reply to author
Forward
0 new messages