idiom question: infinite sequence as data source for many threads?

132 views
Skip to first unread message

free_variation

unread,
Jan 27, 2010, 11:21:49 PM1/27/10
to Clojure
Hi,

I'm a clojure noob, with a background in scheme (and functional
languages).

I'm looking for the "clojure way" to solve the following problem:

I have an infinite sequence. I'd like to have the sequence be a
source for N parallel worker threads that now and then will show up to
grab a few elements from the sequence, then go off and crunch on the
data. The sequence should be traversed only once per execution of the
software.

I'm not clear on how to implement this without keeping a reference to
the sequence, say in a closure. In scheme I might solve the problem
using a continuation. But in clojure if I do, say:

(let [my-seq (....lazy-seq ...)]
(defn get-some-data ...))

won't I run out of memory? Is the solution to have my-seq somehow
refer to the next element to be retrieved, i.e. to the head of the
infinite sequence that has yet to be fed to the workers?

Thanks,

jds

Konrad Hinsen

unread,
Jan 28, 2010, 2:35:38 AM1/28/10
to clo...@googlegroups.com
On 28 Jan 2010, at 05:21, free_variation wrote:

> I'm not clear on how to implement this without keeping a reference to
> the sequence, say in a closure. In scheme I might solve the problem
> using a continuation. But in clojure if I do, say:
>
> (let [my-seq (....lazy-seq ...)]
> (defn get-some-data ...))
>
> won't I run out of memory? Is the solution to have my-seq somehow
> refer to the next element to be retrieved, i.e. to the head of the
> infinite sequence that has yet to be fed to the workers?

Your approach won't work for another reason: you can't change my-seq
inside get-some-data, unlike in Scheme. Moreover, you have the problem
of thread safety. A closure is not sufficient to protect the sequence
against uncoordinated requests from multiple threads.

The Clojure solution for your problem is an agent, which makes the
access thread-safe:

user> (def data-source (agent (cycle [1 2 3])))
#'user/data-source
user> (defn get-some-data [] (let [v (first @data-source)] (send data-
source rest) v))
#'user/get-some-data
user> (get-some-data)
1
user> (get-some-data)
2
user> (get-some-data)
3
user> (get-some-data)
1
user> (get-some-data)
2
user> (get-some-data)
3

Konrad.

Michał Marczyk

unread,
Jan 28, 2010, 2:39:26 AM1/28/10
to clo...@googlegroups.com
2010/1/28 Konrad Hinsen <konrad...@fastmail.net>:

> The Clojure solution for your problem is an agent, which makes the access
> thread-safe:
>
> user> (def data-source (agent (cycle [1 2 3])))
> #'user/data-source
> user> (defn get-some-data [] (let [v (first @data-source)] (send data-source
> rest) v))

Wouldn't this make it possible for two threads to obtain the same
value of (first @data-source), then send two rest messages to the
agent?

A ref would not have this problem, though:

(def data-source (ref the_sequence))
(defn get-some-data [] (dosync (let [v (first @data-source)] (alter
data-source rest) v)))

Sincerely,
Michal

Konrad Hinsen

unread,
Jan 28, 2010, 2:58:06 AM1/28/10
to clo...@googlegroups.com
On 28 Jan 2010, at 08:39, Michał Marczyk wrote:

> Wouldn't this make it possible for two threads to obtain the same
> value of (first @data-source), then send two rest messages to the
> agent?
>
> A ref would not have this problem, though:

You are right. Refs provide synchronous access to the data source.

Konrad.

Christophe Grand

unread,
Jan 28, 2010, 5:38:10 AM1/28/10
to clo...@googlegroups.com
Hi!

Right, or you could also use an atom:
(def data-source (atom (cons nil the_sequence)))
(defn get-some-data [] (first (swap! data-source rest)))

But this retains the last read value until next call to get-some-data.
I recently realized that one can work around this problem:
(def data-source (atom (cons nil the_sequence)))
(defn get-some-data []
(let [s (swap! data-source rest)]
(swap! data-source #(if (identical? s %) (cons nil (rest %)) %))
(first s)))

This make me think that one could optimize Atom/swap by avoiding the
CAS and the validation (but not the watches) when (identical?
new-value old-value).

Christophe

Meikel Brandmeyer

unread,
Jan 28, 2010, 1:42:32 AM1/28/10
to Clojure
Hi,

On Jan 28, 5:21 am, free_variation <cane.c...@gmail.com> wrote:

> Is the solution to have my-seq somehow
> refer to the next element to be retrieved, i.e. to the head of the
> infinite sequence that has yet to be fed to the workers?

You probably want a Ref here to coordinate the changes on the
sequence.

(defn get-step
[queue]
(dosync
(let [q @queue]
(alter queue rest)
q)))

; Or for convenience if you like...
(def get-step-c
(let [queue (ref your-seq-here)]
(partial get-item queue)))

Each worker can call get-step on your seq Ref. Then the current step
in the seq is returned and the Ref is updated to contain the next
step. The seq is returned, so you can do nil-checking of (seq q) or
working with (first q) and what you normally do with seqs.

Sincerely
Meikel

Timothy Pratley

unread,
Jan 28, 2010, 9:53:52 AM1/28/10
to clo...@googlegroups.com
2010/1/28 free_variation <cane...@gmail.com>:

> I have an infinite sequence.  I'd like to have the sequence be a
> source for N parallel worker threads that now and then will show up to
> grab a few elements from the sequence, then go off and crunch on the
> data.  The sequence should be traversed only once per execution of the
> software.

Who's in charge here? :) The other way to look at this problem is an
executor handing out the tasks (as opposed to threads asking for
work). That way there is no need to coordinate between workers.

eg: if you had a file which you read as a lazy sequence you could
create worker tasks like this:
(doseq [d data-seq]
(.submit clojure.lang.Agent/pooledExecutor (cast Callable #(foo d))))

pooledExecutor is just a standard java fixed size thread pool based
upon the number of processors available, so it will only create X
threads at a time. However I believe that submitted jobs are queued so
if your seq processing can get too far ahead you would end up with a
very full queue. I'm sure there must be a way to limit the submission
rate but can't think of it right now maybe someone else will chime in
:)


Regards,
Tim.

Michael Wood

unread,
Jan 28, 2010, 10:15:07 AM1/28/10
to clo...@googlegroups.com
Hi

On 28 January 2010 16:53, Timothy Pratley <timothy...@gmail.com> wrote:
[...]


> eg: if you had a file which you read as a lazy sequence you could
> create worker tasks like this:
> (doseq [d data-seq]
>  (.submit clojure.lang.Agent/pooledExecutor (cast Callable #(foo d))))

What's the purpose of the cast here?

#(...) is an fn, which is Callable by definition, isn't it? So
wouldn't that line be equivalent to:
(.submit clojure.lang.Agent/pooledExecutor #(foo d)))

--
Michael Wood <esio...@gmail.com>

Timothy Pratley

unread,
Jan 28, 2010, 4:30:01 PM1/28/10
to clo...@googlegroups.com
2010/1/29 Michael Wood <esio...@gmail.com>:

>> (doseq [d data-seq]
>>  (.submit clojure.lang.Agent/pooledExecutor (cast Callable #(foo d))))
>
> What's the purpose of the cast here?
>
> #(...) is an fn, which is Callable by definition, isn't it?  So
> wouldn't that line be equivalent to:
> (.submit clojure.lang.Agent/pooledExecutor #(foo d)))

Yes and no... submit is overloaded on Callable and Runnable and #(foo
d) implements both.
user=> (.submit clojure.lang.Agent/pooledExecutor #(inc 1))
java.lang.IllegalArgumentException: More than one matching method found: submit

The cast was just a quick hack to make the example work.

Paul Mooser

unread,
Jan 28, 2010, 7:15:37 PM1/28/10
to Clojure
This is something I run into with executors in Java periodically - I
have never understood why there isn't a default implementation
provided which has a blocking queue for tasks which will block on
submission to the queue if it is full. If I'm not mistaken, the
default ones which use bounded blocking queues throw rejected
execution exceptions if you submit too many tasks, or just let the
queue grow without bounds.

It should be relatively easy to set up an executor with that behavior,
which would make it fairly easy to not produce things "too far" ahead
of consumption.

free_variation

unread,
Jan 28, 2010, 11:05:51 PM1/28/10
to Clojure
You people are terrific, thanks so much.

I basically went with what Michal and Konrad worked out:

(let [feature-stream (ref nil)]
(defn init-features [stream]
(dosync (ref-set feature-stream stream))
'ready)
(defn get-feature []
(dosync
(let [f (first @feature-stream)]
(alter feature-stream rest)
f))))

Works like a charm. Are the ref to nil (as initial value) and the
constructor-like "init-features" in the closure idiomatic to Clojure?
I realize I'm writing scheme here :)

jds

On Jan 28, 2:39 am, Michał Marczyk <michal.marc...@gmail.com> wrote:
> 2010/1/28 Konrad Hinsen <konrad.hin...@fastmail.net>:

Meikel Brandmeyer

unread,
Jan 29, 2010, 1:42:18 AM1/29/10
to Clojure
Hi,

On Jan 29, 5:05 am, free_variation <cane.c...@gmail.com> wrote:

> You people are terrific, thanks so much.
>
> I basically went with what Michal and Konrad worked out:
>
> (let [feature-stream (ref nil)]
>         (defn init-features [stream]
>                 (dosync (ref-set feature-stream stream))
>                 'ready)
>         (defn get-feature []
>                 (dosync
>                         (let [f (first @feature-stream)]
>                                 (alter feature-stream rest)
>                                 f))))
>
> Works like a charm.  Are the ref to nil (as initial value) and the
> constructor-like "init-features" in the closure idiomatic to Clojure?
> I realize I'm writing scheme here :)

Please note, that you cannot distinguish a nil in the stream from an
exhausted stream. That's the reason I returned the whole seq in my
solution and not only the first element. Maybe this doesn't apply in
your case, but you should keep it in mind for other occasions.

For the idiomatic question (and personal taste): Use :keywords instead
of 'quoted-symbols. I would not hard-wire the ref. I would make it
explicit as an argument. You can spice this more general function for
convenience with partial.

Sincerely
Meikel

Michael Wood

unread,
Jan 29, 2010, 2:45:14 AM1/29/10
to clo...@googlegroups.com

Ah, OK. Thanks for the explanation.

--
Michael Wood <esio...@gmail.com>

cburroughs

unread,
Jan 29, 2010, 8:50:45 AM1/29/10
to Clojure
java.util.concurrent.LinkedBlockingQueue<E>

Check out the put() method. That what I used for a program similar to
the original poster when I needed control over the number of threads.

free_variation

unread,
Jan 30, 2010, 1:07:09 AM1/30/10
to Clojure
Thank you Meikel, I was aware of the nil issue but it's good that you
made that explicit.

I got rid of the 'symbol per your suggestion, and factored the closure
to take the stream as an input:

(defn init-features [stream]
(let [feature-stream (ref stream)]
(dosync (ref-set feature-stream stream))
(fn []


(dosync
(let [f (first @feature-stream)]
(alter feature-stream rest)

f)))))

It's amazing to me how concise this is. I'm really loving Clojure! I
kinda miss typing 'lambda' instead of 'fn' or '#(', but I suppose a
macro can fix that. Must have been done already :)

Timothy Pratley

unread,
Jan 30, 2010, 2:09:08 AM1/30/10
to clo...@googlegroups.com
Below I present 'submit-future' which is similar to the existing
'future' call in that it spawns a thread to execute a task, but
differs in that it will block if n submitted futures are already
running, where n is the number of available processors. I think this
could be quite handy for the producer-consumer model which lazy-seq
lends itself to, allowing one to write:

(doseq [d data-seq]
 (submit-future (foo d)))

where data-seq is some feed of CPU bound tasks which you want to
process as quickly as possible. This is diverging from the OP, but
thought it might be of interest:


(let [limit (.availableProcessors (Runtime/getRuntime))
sem (java.util.concurrent.Semaphore. limit)]
(defn submit-future-call
"Takes a function of no args and yields a future object that will
invoke the function in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[#^Callable task]
; take a slot (or block until a slot is free)
(.acquire sem)
(try
; create a future that will free a slot on completion
(future (try (task) (finally (.release sem))))
(catch java.util.concurrent.RejectedExecutionException e
; no task was actually submitted
(.release sem)
(throw e)))))

(defmacro submit-future
"Takes a body of expressions and yields a future object that will
invoke the body in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[& body] `(submit-future-call (fn [] ~@body)))

#_(example
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@6c69d02b: :pending>
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@38827968: :pending>
user=> (submit-future (reduce + (range 100000000)))
;; blocks at this point for a 2 processor PC until the previous
;; two futures complete
#<core$future_call$reify__5782@214c4ac9: :pending>)

free_variation

unread,
Jan 31, 2010, 11:21:23 PM1/31/10
to Clojure
Excellent, very nice. Mind if I use it?

jds

Timothy Pratley

unread,
Jan 31, 2010, 11:54:52 PM1/31/10
to clo...@googlegroups.com
On 1 February 2010 15:21, free_variation <cane...@gmail.com> wrote:
> Mind if I use it?

Of course I don't mind. It is intended for your use. :)

Regards,
Tim.

Daniel Werner

unread,
Feb 1, 2010, 3:56:30 PM2/1/10
to Clojure
On Jan 30, 8:09 am, Timothy Pratley <timothyprat...@gmail.com> wrote:
> Below I present 'submit-future' which is similar to the existing
> 'future' call in that it spawns a thread to execute a task, but
> differs in that it will block if n submitted futures are already
> running, where n is the number of available processors. I think this
> could be quite handy for the producer-consumer model which lazy-seq
> lends itself to, allowing one to write:

This looks like it could become really useful in the future (no pun
intended), thanks for sharing! Instead of enforcing its own thread
count limit, however, it might be worthwhile to interface with the
Agent thread pool. Otherwise, in the worst case a machine with N cores
could try to run N+2 agent threads and N+2 future-submit threads at
once. That said, I'm not sure about the consequences of the Agent
thread pool being saturated by copious use of future-submit.

Daniel Werner

unread,
Feb 1, 2010, 3:57:51 PM2/1/10
to Clojure
On Jan 30, 7:07 am, free_variation <cane.c...@gmail.com> wrote:
> (defn init-features [stream]
> (let [feature-stream (ref stream)]
> (dosync (ref-set feature-stream stream))

The call to ref-set seems redundant here since you already initialize
the ref with stream as its value.

free_variation

unread,
Feb 1, 2010, 8:56:16 PM2/1/10
to Clojure
Urp yes thanks! Remainder of a previous version of the function.

jds

On Feb 1, 3:57 pm, Daniel Werner <daniel.d.wer...@googlemail.com>
wrote:

Reply all
Reply to author
Forward
0 new messages