perform action after events stop for some period

201 views
Skip to first unread message

Brian Craft

unread,
Dec 1, 2014, 4:37:56 PM12/1/14
to clo...@googlegroups.com
I have need to perform an action when a series of events is quiet for some period. That is, if one event arrives an action is queued to execute after some timeout. If a second event arrives the timeout is reset, and so-forth.

The following code seems to work, however I'm wondering if calling 'future' from 'swap!' is a bad idea (side effecting), and if there's a better way.

(defn queue-with-delay [period func]
  (let [f (atom nil)]
    (fn []
      (when @f
        (future-cancel @f))
      (swap! f (fn [_] (future (Thread/sleep period) (func)))))))


Use like

(def event (queue-with-delay 2000 #(println "running")))
(event)
(event)
(event)  ; pause 2 sec
"running"



Brian Craft

unread,
Dec 1, 2014, 6:50:32 PM12/1/14
to clo...@googlegroups.com
That version has the unfortunate behavior that (func) can be interrupted if (event) is called while it is running. Here's another version using an agent:

(defn queue-with-delay2 [period func]
  (let [q (agent nil)]
    (fn []
      (send-off q (fn [t]
                    (when t
                      (future-cancel t))
                    (future (Thread/sleep period) (send-off q (fn [_] (func) nil))))))))

Running with a sleep to see that (func) is not canceled by subsequence (event) calls:

(def event (queue-with-delay2 2000 #(do (println "running") (Thread/sleep 2000) (println "ending"))))

Oddly, if calling (event) between "running" and "ending" messages, the repl will stack-overflow on the return value. No idea what that's about. But, running like this is fine:

(do (event) nil)

Erik Price

unread,
Dec 1, 2014, 8:33:10 PM12/1/14
to clo...@googlegroups.com

Coincidentally, we recently wrote code to do something very similar. The following function will invoke f after period milliseconds, unless a value is sent on events-ch, in which case the timeout is reset (and starts counting down again):

(defn invoke-after-uninterrupted-delay
  ([period events-ch f]
    (invoke-after-uninterrupted-delay period events-ch f []))
  ([period events-ch f & args]
    (async/go-loop []
      (let [[_ p] (async/alts! [(async/timeout period) events-ch])]
        (if (= p events-ch)
          (recur)
          (apply f args))))))

e


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Gary Verhaegen

unread,
Dec 2, 2014, 6:47:58 AM12/2/14
to clo...@googlegroups.com
In the general case, side effects within the swap! function are a bad idea because of the optimistic locking. In your first code snippet, if there is any contention on the atom (and maybe in your app you know there is none because it's only ever accesses by the same single thread), you run the risk of having orphaned futures.

As far as I know there should be no such problem with the agent version. I'm not really sure about the nesting of send-off calls though; that might be the source of your stack overflow. I seem to remember that this was not supported up until 1.4 or 1.5; not sure what the current semantics is.

Depending on how many different event types you're watching for (and how many differet actions you need to take), it might be worth having a single thread managing the queue. Somethin along the line of having a single atom containing a priority queue (or a sorted map?) with, for each event type, the last time the event was observed. At some interval, that event thread could check the queue and run the required handlers based on the current time. When an event arrives, it resets the time associated to its type in the queue.

Whether this is better will depend on your usage pattern. I would just like to point out that creating a future has some non trivial overhead as it also creates a thread (at least, the last time I checked, futures where not created out of a limited thread pool).

Brian Craft

unread,
Dec 2, 2014, 12:22:16 PM12/2/14
to clo...@googlegroups.com
The nested send-off call doesn't happen on the same thread (it's in a future). Seems like that would be the same as if an unrelated thread called send-off while the outer send-off was running.

It does seem like a single-thread solution would be better, not creating so many futures. Polling seems pretty crude, but I don't see another way of doing it with clojure abstractions. Maybe a pure java solution.

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.

Erik Price

unread,
Dec 2, 2014, 1:03:54 PM12/2/14
to clo...@googlegroups.com
On Tue, Dec 2, 2014 at 12:22 PM, Brian Craft <craft...@gmail.com> wrote:

It does seem like a single-thread solution would be better, not creating so many futures. Polling seems pretty crude, but I don't see another way of doing it with clojure abstractions. Maybe a pure java solution.

FWIW, the core.async-based solution satisfies both the criteria of being single-threaded and non-polling.

e

Brian Craft

unread,
Dec 2, 2014, 3:05:01 PM12/2/14
to clo...@googlegroups.com

Cool. I haven't used core.async before, and am a bit reluctant to pull in another dependency just for this. But maybe it's the right solution.
Message has been deleted

Dylan Butman

unread,
Dec 3, 2014, 2:49:36 PM12/3/14
to clo...@googlegroups.com

Erik that's pretty! But be careful about go-loops and closed channels. This will recur infinitely if events-ch is closed (it will continuously return nil)

(defn invoke-after-uninterrupted-delay

  ([period events-ch f]

    (invoke-after-uninterrupted-delay period events-ch f []))

  ([period events-ch f & args]

    (async/go-loop []

      (let [[v p] (async/alts! [(async/timeout period) events-ch])]

        (when v

          (if (= p events-ch)

            (recur)

            (apply f args)))))))

will allow the go-loop to return when the channel is closed.

Erik Price

unread,
Dec 3, 2014, 3:47:35 PM12/3/14
to clo...@googlegroups.com
Thank you for calling my attention to this possibility!

e

Dylan Butman

unread,
Dec 4, 2014, 12:23:48 PM12/4/14
to clo...@googlegroups.com
woops should actually be

(go-loop []
      (let [[v c] (alts! [(timeout period) events-ch])]
        (if (= c events-ch)
          (when v
            (recur))
          (apply f args))))

the timeout returns nil....

juan.facorro

unread,
Dec 6, 2014, 4:16:56 PM12/6/14
to clo...@googlegroups.com
Hi Brian,

I had the same requirement while building an application, it included a GUI and I wanted to perform some actions only after the user was done editing some text. I also first implemented a solution using an atom, Thread/sleep and a future. It didn't seem right though, since I was creating a bunch of threads all the time, so I tried an approach with the tools core.async provides and the result was the following:

(defn timeout-channel
"Creates a channel and a go block that takes from it. The go block keeps
an internal status with two possible values, `:wait` and `:receive`.
In ':wait' status, execution is blocked until there's a value available in the
channel, it then enters the ':receive' status, until the timeout wins.
Returns the channel where events need to be pushed."
[timeout-ms f]
(let [c (async/chan)]
(async/go-loop [status :wait
args nil]
(condp = status
:wait
(recur :receive (async/<! c))
:receive
(let [[_ ch] (async/alts! [c (async/timeout timeout-ms)])]
(if (= ch c)
(recur :receive args)
(do
(async/thread (if (sequential? args) (apply f args) (f args)))
(recur :wait nil)))))) 
    c)) 

Signalling the go block to terminate when the event source is no longer available is missing, but that simple enough to implement. Maybe this is something you can use as a starter point.

Hope it helps,

Juan

Brian Craft

unread,
Dec 6, 2014, 4:31:33 PM12/6/14
to clo...@googlegroups.com
I'm confused on the thread issue. Don't futures run from the agent thread pool? So they don't really create a thread?

Also found climatecorp's claypoole library, which has a 'future' call that works with a thread pool, further confusing me. Not sure how that's different from the agent thread pool.

Juan Martín

unread,
Dec 6, 2014, 4:55:47 PM12/6/14
to clo...@googlegroups.com
Hey Brian,

Yes you are absolutely right, my bad, futures do run on a thread pool since they use the clojure.lang.Agent/soloExecutor created here. Sorry for the confusion introduced, I had this problem a while ago and mistakenly posted the message out of the top of my head. 

In the end, I guess I ended up using core.async because to me the solution to the problem seemed a lot more natural when expressed with channels and go blocks.

Cheers,

Juan    

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/ereNh_csKbs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Juan Facorro
Reply all
Reply to author
Forward
0 new messages