producing a seq in a background thread

32 views
Skip to first unread message

Chouser

unread,
Jun 30, 2008, 1:39:32 AM6/30/08
to clo...@googlegroups.com
I had fun making this, so I thought I'd share. It's a little function
that makes it easy to have a slow and/or blocking operation produce a
sequence in another thread, while allowing it the results be treated
like a regular seq in the main thread.

(def *enqueue*)

(defn with-bg-queue-fn [queue-size func]
(let [q (java.util.concurrent.LinkedBlockingDeque. queue-size)
a (agent nil)]
(send-off a
(fn [_]
(try
(binding [*enqueue* #(.put q %)]
(func))
(finally (.put q false)))))
(lazy-cat (for [item (repeatedly #(.take q)) :while item] item)
@a)))

(defmacro with-bg-queue [& body]
`(with-bg-queue-fn (Integer/MAX_VALUE) (fn [] ~@body)))

(defmacro with-bg-queue-len [queue-size & body]
`(with-bg-queue-fn ~queue-size (fn [] ~@body)))


Here's an example of using it to slowly produce the numbers 0 through 9:

(def slow-numbers
(with-bg-queue
(doseq j (range 10)
(Thread/sleep 500)
;(when (= j 5) (throw (Exception. "oops")))
(*enqueue* j))))

What could be simpler, right? Inside the with-bg-queue, there's a
thread-local function number *enqueue* that you use to append things
to the seq as you get them. In a real usage this would presumably be
slow because of network lag, intensive computation, or whatever. The
resulting seq is stored as slow-numbers.

So now we want to use this seq. Well, it's just like any other seq
except that if you call it's rest before the producer has gotten that
far, your thread may block. So when you run this part, it will speed
ahead to however far the background thread has progressed, and then
give you the next number every half-second.

(doseq i slow-numbers
(prn :got i)
(flush))

It even handles exceptions in the producer by allowing the agent to
hang onto the exception until a consumer catches up with it, at which
point the exception will be propagated to the consumer's thread. You
can try it out by un-commenting the throw line in the example above.

The with-bg-queue-len macro is there in case you want to throttle the
producer. Just pass in an number indicating how far the producer can
get ahead of the consumer.

Also note that you can have as many consumers in as many threads as
you want -- each will see every value produced and will block as
needed.

--Chouser

Rich Hickey

unread,
Jun 30, 2008, 10:20:04 AM6/30/08
to Clojure
Neat. I found the participation of the client code (having to call
*enqueue*) a bit undesirable, so I tried to recast the problem as one
of queue-izing an existing lazy seq. Here's what I came up with:

(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. Note that reading from a seque can
block if the reader gets ahead of the producer."
([s] (seque Integer/MAX_VALUE s))
([n s]
(let [q (java.util.concurrent.LinkedBlockingQueue. n)
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
agt (agent (seq s))
step (fn step []
(let [x (.take q)]
(if (identical? x q) ;q itself is eos sentinel
@agt ;will be nil, touch agent
just to propagate errors
(lazy-cons (if (identical? x NIL) nil x)
(step)))))]
(send-off agt
(fn [s]
(try
(loop [[x & xs :as s] s]
(when s
(.put q (if (nil? x) NIL x))
(recur xs)))
(finally ;q itself is eos sentinel
(.put q q)))))
(step))))

This one should handle arbitrary values including nil and false, as
well as propagating any exceptions. Use it like so:

(doseq x (seque (take 10 (repeatedly #(do (Thread/sleep 500)
(rand)))))
(println x))

I haven't tested it much, input/bug reports welcome. Seems like a
useful function for the library.

Rich

Chouser

unread,
Jul 3, 2008, 1:00:27 AM7/3/08
to clo...@googlegroups.com
On Mon, Jun 30, 2008 at 10:20 AM, Rich Hickey <richh...@gmail.com> wrote:
>
> Neat. I found the participation of the client code (having to call
> *enqueue*) a bit undesirable,

So diplomatic! It probably turned your stomach.

> This one should handle arbitrary values including nil and false, as
> well as propagating any exceptions. Use it like so:
>
> (doseq x (seque (take 10 (repeatedly #(do (Thread/sleep 500)
> (rand)))))
> (println x))

This sparked conversation in IRC, starting here:
http://clojure-log.n01se.net/date/2008-06-30.html#10:44c

It was noted that all version up to this point could leak a thread,
which led some variations using WeakReference:
http://paste.lisp.org/display/63042

Today Rich had another idea for how to avoid the thread
leak:http://clojure-log.n01se.net/date/2008-07-02.html#15:00

Here's my humble attempt at an implementation of that idea:

(defn seque
"Creates a queued seq on another (presumably lazy) seq s. The queued
seq will produce a concrete seq in the background, and can get up to
n items ahead of the consumer. Note that reading from a seque can
block if the reader gets ahead of the producer."
([s] (seque Integer/MAX_VALUE s))
([n s]
(let [q (java.util.concurrent.LinkedBlockingQueue. n)
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
agt (agent (seq s))

fill (fn [s]


(try
(loop [[x & xs :as s] s]

(if s
(if (.offer q (if (nil? x) NIL x) 1
java.util.concurrent.TimeUnit/SECONDS)
(recur xs)
s)
(.put q q))) ; q itself is eos sentinel
(catch Exception e
(.put q q)
(throw e))))]
(send-off agt fill)
((fn drain []


(let [x (.take q)]
(if (identical? x q) ;q itself is eos sentinel
@agt ;will be nil, touch agent just to
propagate errors

(do
(send-off agt fill)
(lazy-cons (if (identical? x NIL) nil x) (drain))))))))))

This is used identically to Rich's earlier post, but shouldn't ever
leak the producer thread.

However, after a phone conversation with Aaron Brooks tonight, I think
it may make sense to remove from seque everything related to the
specific queue implementation and policy. Application-specific
policies might take into account network congestion, cpu load, or may
coordinate with other queues when deciding how far ahead to read.
Also, the implementation above has quite a bit of cruft to deal with
LinkedBlockingQueue that tends to obscure what seque is really doing.
So here's one way things could be split up:

(defn LBQ
"Creates a seque policy using a LinkedBlockingQueue that allows the
producer to get up to n items ahead of the consumer (defaults 1)."
([] (LBQ 1))
([n]
(let [NIL (Object.) ;nil sentinel since LBQ doesn't support nils
q (java.util.concurrent.LinkedBlockingQueue. n)]
{:offer #(.offer q (if (nil? %) NIL %) 1
java.util.concurrent.TimeUnit/SECONDS)
:take-one #(let [x (.take q)] (if (identical? x NIL) nil x))
:put-eos #(.put q q) ; q itself is eos sentinel
:eos? #(identical? % q)})))

(defn seque
"Creates a queued seq on another seq s. The use of s in a separate
thread will be managed by the given policy. Note that reading


from a seque can block if the reader gets ahead of the producer."

([{:keys [offer put-eos take-one eos?]} s]
(let [agt (agent (seq s))
fill (fn [s]


(try
(loop [[x & xs :as s] s]

(if s
(if (offer x)
(recur xs)
s)
(put-eos)))
(catch Exception e
(put-eos)
(throw e))))]
(send-off agt fill)
((fn drain []
(let [x (take-one)]
(if (eos? x)


@agt ; will be nil, touch agent just to propagate errors

(do
(send-off agt fill)
(lazy-cons x (drain))))))))))

Here are some toy examples of how to use it:

;=== producer-bound ===
(doseq x (seque (LBQ) (take 10 (repeatedly #(do (Thread/sleep 500) (rand)))))
(prn x))

;=== producer-bound with exception ===
(doseq x (seque (LBQ) (map #(do (Thread/sleep 500)
(when (= % 5) (assert false))
%)
(range 10)))
(prn x))

;=== producer-bound, but gets ahead ===
(def sq (seque (LBQ 100) (map #(do (Thread/sleep 500)
(when (= % 8) (assert false))
%)
(range 10))))
(Thread/sleep 2000)
(doseq x sq (prn x))

;=== consumer-bound ===
(doseq x (seque (LBQ) (range 10))
(Thread/sleep 500)
(prn x))

--Chouser

Reply all
Reply to author
Forward
0 new messages