I'm a clojure noob, with a background in scheme (and functional
languages).
I'm looking for the "clojure way" to solve the following problem:
I have an infinite sequence. I'd like to have the sequence be a
source for N parallel worker threads that now and then will show up to
grab a few elements from the sequence, then go off and crunch on the
data. The sequence should be traversed only once per execution of the
software.
I'm not clear on how to implement this without keeping a reference to
the sequence, say in a closure. In scheme I might solve the problem
using a continuation. But in clojure if I do, say:
(let [my-seq (....lazy-seq ...)]
(defn get-some-data ...))
won't I run out of memory? Is the solution to have my-seq somehow
refer to the next element to be retrieved, i.e. to the head of the
infinite sequence that has yet to be fed to the workers?
Thanks,
jds
> I'm not clear on how to implement this without keeping a reference to
> the sequence, say in a closure. In scheme I might solve the problem
> using a continuation. But in clojure if I do, say:
>
> (let [my-seq (....lazy-seq ...)]
> (defn get-some-data ...))
>
> won't I run out of memory? Is the solution to have my-seq somehow
> refer to the next element to be retrieved, i.e. to the head of the
> infinite sequence that has yet to be fed to the workers?
Your approach won't work for another reason: you can't change my-seq
inside get-some-data, unlike in Scheme. Moreover, you have the problem
of thread safety. A closure is not sufficient to protect the sequence
against uncoordinated requests from multiple threads.
The Clojure solution for your problem is an agent, which makes the
access thread-safe:
user> (def data-source (agent (cycle [1 2 3])))
#'user/data-source
user> (defn get-some-data [] (let [v (first @data-source)] (send data-
source rest) v))
#'user/get-some-data
user> (get-some-data)
1
user> (get-some-data)
2
user> (get-some-data)
3
user> (get-some-data)
1
user> (get-some-data)
2
user> (get-some-data)
3
Konrad.
Wouldn't this make it possible for two threads to obtain the same
value of (first @data-source), then send two rest messages to the
agent?
A ref would not have this problem, though:
(def data-source (ref the_sequence))
(defn get-some-data [] (dosync (let [v (first @data-source)] (alter
data-source rest) v)))
Sincerely,
Michal
> Wouldn't this make it possible for two threads to obtain the same
> value of (first @data-source), then send two rest messages to the
> agent?
>
> A ref would not have this problem, though:
You are right. Refs provide synchronous access to the data source.
Konrad.
Right, or you could also use an atom:
(def data-source (atom (cons nil the_sequence)))
(defn get-some-data [] (first (swap! data-source rest)))
But this retains the last read value until next call to get-some-data.
I recently realized that one can work around this problem:
(def data-source (atom (cons nil the_sequence)))
(defn get-some-data []
(let [s (swap! data-source rest)]
(swap! data-source #(if (identical? s %) (cons nil (rest %)) %))
(first s)))
This make me think that one could optimize Atom/swap by avoiding the
CAS and the validation (but not the watches) when (identical?
new-value old-value).
Christophe
On Jan 28, 5:21 am, free_variation <cane.c...@gmail.com> wrote:
> Is the solution to have my-seq somehow
> refer to the next element to be retrieved, i.e. to the head of the
> infinite sequence that has yet to be fed to the workers?
You probably want a Ref here to coordinate the changes on the
sequence.
(defn get-step
[queue]
(dosync
(let [q @queue]
(alter queue rest)
q)))
; Or for convenience if you like...
(def get-step-c
(let [queue (ref your-seq-here)]
(partial get-item queue)))
Each worker can call get-step on your seq Ref. Then the current step
in the seq is returned and the Ref is updated to contain the next
step. The seq is returned, so you can do nil-checking of (seq q) or
working with (first q) and what you normally do with seqs.
Sincerely
Meikel
Who's in charge here? :) The other way to look at this problem is an
executor handing out the tasks (as opposed to threads asking for
work). That way there is no need to coordinate between workers.
eg: if you had a file which you read as a lazy sequence you could
create worker tasks like this:
(doseq [d data-seq]
(.submit clojure.lang.Agent/pooledExecutor (cast Callable #(foo d))))
pooledExecutor is just a standard java fixed size thread pool based
upon the number of processors available, so it will only create X
threads at a time. However I believe that submitted jobs are queued so
if your seq processing can get too far ahead you would end up with a
very full queue. I'm sure there must be a way to limit the submission
rate but can't think of it right now maybe someone else will chime in
:)
Regards,
Tim.
On 28 January 2010 16:53, Timothy Pratley <timothy...@gmail.com> wrote:
[...]
> eg: if you had a file which you read as a lazy sequence you could
> create worker tasks like this:
> (doseq [d data-seq]
> (.submit clojure.lang.Agent/pooledExecutor (cast Callable #(foo d))))
What's the purpose of the cast here?
#(...) is an fn, which is Callable by definition, isn't it? So
wouldn't that line be equivalent to:
(.submit clojure.lang.Agent/pooledExecutor #(foo d)))
--
Michael Wood <esio...@gmail.com>
Yes and no... submit is overloaded on Callable and Runnable and #(foo
d) implements both.
user=> (.submit clojure.lang.Agent/pooledExecutor #(inc 1))
java.lang.IllegalArgumentException: More than one matching method found: submit
The cast was just a quick hack to make the example work.
It should be relatively easy to set up an executor with that behavior,
which would make it fairly easy to not produce things "too far" ahead
of consumption.
I basically went with what Michal and Konrad worked out:
(let [feature-stream (ref nil)]
(defn init-features [stream]
(dosync (ref-set feature-stream stream))
'ready)
(defn get-feature []
(dosync
(let [f (first @feature-stream)]
(alter feature-stream rest)
f))))
Works like a charm. Are the ref to nil (as initial value) and the
constructor-like "init-features" in the closure idiomatic to Clojure?
I realize I'm writing scheme here :)
jds
On Jan 28, 2:39 am, Michał Marczyk <michal.marc...@gmail.com> wrote:
> 2010/1/28 Konrad Hinsen <konrad.hin...@fastmail.net>:
On Jan 29, 5:05 am, free_variation <cane.c...@gmail.com> wrote:
> You people are terrific, thanks so much.
>
> I basically went with what Michal and Konrad worked out:
>
> (let [feature-stream (ref nil)]
> (defn init-features [stream]
> (dosync (ref-set feature-stream stream))
> 'ready)
> (defn get-feature []
> (dosync
> (let [f (first @feature-stream)]
> (alter feature-stream rest)
> f))))
>
> Works like a charm. Are the ref to nil (as initial value) and the
> constructor-like "init-features" in the closure idiomatic to Clojure?
> I realize I'm writing scheme here :)
Please note, that you cannot distinguish a nil in the stream from an
exhausted stream. That's the reason I returned the whole seq in my
solution and not only the first element. Maybe this doesn't apply in
your case, but you should keep it in mind for other occasions.
For the idiomatic question (and personal taste): Use :keywords instead
of 'quoted-symbols. I would not hard-wire the ref. I would make it
explicit as an argument. You can spice this more general function for
convenience with partial.
Sincerely
Meikel
Check out the put() method. That what I used for a program similar to
the original poster when I needed control over the number of threads.
I got rid of the 'symbol per your suggestion, and factored the closure
to take the stream as an input:
(defn init-features [stream]
(let [feature-stream (ref stream)]
(dosync (ref-set feature-stream stream))
(fn []
(dosync
(let [f (first @feature-stream)]
(alter feature-stream rest)
f)))))
It's amazing to me how concise this is. I'm really loving Clojure! I
kinda miss typing 'lambda' instead of 'fn' or '#(', but I suppose a
macro can fix that. Must have been done already :)
(doseq [d data-seq]
(submit-future (foo d)))
where data-seq is some feed of CPU bound tasks which you want to
process as quickly as possible. This is diverging from the OP, but
thought it might be of interest:
(let [limit (.availableProcessors (Runtime/getRuntime))
sem (java.util.concurrent.Semaphore. limit)]
(defn submit-future-call
"Takes a function of no args and yields a future object that will
invoke the function in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[#^Callable task]
; take a slot (or block until a slot is free)
(.acquire sem)
(try
; create a future that will free a slot on completion
(future (try (task) (finally (.release sem))))
(catch java.util.concurrent.RejectedExecutionException e
; no task was actually submitted
(.release sem)
(throw e)))))
(defmacro submit-future
"Takes a body of expressions and yields a future object that will
invoke the body in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block.
If n futures have already been submitted, then submit-future blocks
until the completion of another future, where n is the number of
available processors."
[& body] `(submit-future-call (fn [] ~@body)))
#_(example
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@6c69d02b: :pending>
user=> (submit-future (reduce + (range 100000000)))
#<core$future_call$reify__5782@38827968: :pending>
user=> (submit-future (reduce + (range 100000000)))
;; blocks at this point for a 2 processor PC until the previous
;; two futures complete
#<core$future_call$reify__5782@214c4ac9: :pending>)
jds
Of course I don't mind. It is intended for your use. :)
Regards,
Tim.
This looks like it could become really useful in the future (no pun
intended), thanks for sharing! Instead of enforcing its own thread
count limit, however, it might be worthwhile to interface with the
Agent thread pool. Otherwise, in the worst case a machine with N cores
could try to run N+2 agent threads and N+2 future-submit threads at
once. That said, I'm not sure about the consequences of the Agent
thread pool being saturated by copious use of future-submit.
The call to ref-set seems redundant here since you already initialize
the ref with stream as its value.
jds
On Feb 1, 3:57 pm, Daniel Werner <daniel.d.wer...@googlemail.com>
wrote: