Left-fold enumerators or how to implement map and reduce six times

47 views
Skip to first unread message

Christophe Grand

unread,
Nov 12, 2008, 10:48:14 AM11/12/08
to clo...@googlegroups.com
On my spare time, I've been thinking (and trying) to port Left-Fold
Enumerators (http://okmij.org/ftp/Streams.html) to Clojure for a week or
so — incidentally
(http://clojure-log.n01se.net/date/2008-11-06.html#20:41) I learnt that
Rich is also thinking about them.

Here is a report of my trials and errors on the way to implementing LFE
in Clojure. You can go straight to step 5 for actual working code and a
repl session.
(Abstract: I learnt clever tricks using continuations or the Y
combinator and turned them in a lousy map.)

STEP 0:
The main idea behind LFE (compared to lazy IO or imperative IO) is to
introduce an inversion of control: removing resources (file handles
etc.) allocation and iteration code from user code.
eg:
> (defn count-words [filename]
> (with-open rdr (-> filename java.io.FileReader. java.io.BufferedReader.)
> (reduce (fn [c line] (+ c (-> line (.split " ") count))) 0
> (line-seq rdr))))

can be rewritten using an enumerator:
> (defn count-words [filename]
> ((enum-lines filename) (fn [c line] (+ c (-> line (.split " ")
count))) 0))

or, introducing an ereduce function:
> (defn ereduce [f seed e]
> (e f seed))
>
> (defn count-words [filename]
> (let [io-enum (enum-lines filename)]
> (ereduce (fn [c line] (+ c (-> line (.split " ") count))) 0 io-enum)))

where enum-lines returns a function which "reduces" the lines of the
specified file using the function and the seed passed as arguments.
Making enumerators implement clojure.lang.IReduce would allow us to
directly use Clojure's reduce with enumerators.

One can build an enumerator from collections:
> (defn enum-coll [coll]
> (fn [f seed]
> (reduce f seed coll)))

So far so good. One can even define basic combinators and predicates on
enumerators: econcat, etake, edrop, efilter, emap (only on one enum).
For example:
> (defn econcat [e1 e2]
> (fn [f seed]
> (e2 f (e1 f seed))))
>
> (defn edrop [n e]
> (fn [f seed]
> (first (e (fn [[acc i] x] (if (pos? i) [acc (dec i)] [(f acc x) 0]))
[seed n]))))
>
> (defn efilter [pred e]
> (fn [f seed]
> (e #(if (pred %2) (f %1 %2) %1) seed)))
>
> (defn etake [n e]
> (fn [f seed]
> ((e
> (fn [[acc n :as a] x] (if (pos? n) [(f acc x) (dec n)] a)
> [seed n]) 0)))
>
> (def enil ;the empty enumerator
> (fn [f seed]
> seed))
>
> (def enil? [e]
> (e (constantly false) true))

One can also build enumerators based on java.nio which would be reduced
on an IO thread. (Since (reduce f seed coll) is roughly equivalent to
(let [a (agent seed)] (doseq [x coll] (send a f x)) (await a) @a)
integration between multiplexed enumerators and agent is relatively
straightforward.)

There are two drawbacks to this simple design:
* the enumerated collection must be entirely processed (no early
termination, see enil? or etake),
* no "parallel loops" (btw, there's not that much facility for "parallel
traversing" in Clojure: there's map... and map; it could be a job for
the vector-binded doseq (and then we would need a dofor...)).

Adding early exit means designing a protocol for the reducing function
to signal to the enumerator to stop there. It's not a tough point, I
won't bother about it until the end of this post.

STEP 1:
Concerning "parallel loops" (that is an emap that works on several
enumerators at once, like map does with seqs) I tried to come up with a
solution using one of Oleg Kiselyov's tricks to turn an enumerator
inside out.
The trick is to use a non recursive version of the enumerator — this non
recursive enumerator takes the (recursive) enumerator as its first
argument. (Btw, thanks to the recur special form, some clever macrology
can help building a non recursive enumerator from the source of a
recursive one.)

With a non-recursive enumerator, ereduce becomes:
> (defn ereduce [f seed non-rec-enum]
> (let [good-old-enum (fn self [f seed] (non-rec-enum self f seed))]
> (good-old-enum f seed)))

But it smells like blown up stack. Let's trampoline!

> (defn ereduce [f seed non-rec-enum] ; with trampolining
> (let [ccall (fn self [& args] (cons self args)) ; ccall = capture call
> ccall? #(and (seq? %) (= (first %) ccall))]
> (loop [r (non-rec-enum ccall f seed)]
> (if (ccall? r)
> (recur (apply non-rec-enum r))
> r))))

Back to emap:
> (defn emap [f & nres] ;nres = non recursive enumerators
> (fn [g seed]
> (let [ccall (fn self [& args] (cons self args))
> ccall? #(and (seq? %) (= (first %) ccall))]
> (loop [rs (map #(%1 ccall (fn [_ x] x) nil) nres) acc seed]
> (if (every ccall? rs)
> (recur
> (map #(%1 ccall (fn [_ x] x) nil) nres) ; déjà vu! Does not sound
good, see below
> (g acc (apply f (map last rs))))
> acc))))) ; here I should consume all the enums til the end to ensure
correct resource freeing... or trigger early exit or something

STEP 2:
Well that's not that easy, there's an elephant in the room since I
introduced the non recursive enumerator: implicit state! (and yes, I
know, emap does not return a non recursive enumerator but an enumerator)
So the non recursive enumerator must take and pass another arg (its
state) and I need a way to initialize the state. I chose to use
different arities to tell the first call from the others.
> (defn a-non-rec-enum
> ([self f seed] ;first call
> ......)
> ([self state f acc] ;subsequent calls
> ......))

The trampolining ereduce does not need to be modified to pass the
enumerator's state around.
Here is the new emap:
> (defn emap [f & nres] ; now with explicit state handling -- and
returns a non rec enum
> (let [ccall (fn self [& args] (cons self args))
> ccall? #(and (seq? %) (= (first %) ccall))
> common (fn [rs self g acc] ;
> (if (every ccall? rs)
> (self
> (map second rs) ; compound state
> g
> (g acc (apply f (map last rs))))
> acc))] ; still TODO: add proper termination
> (fn
> ([self g seed]
> (let [rs (map #(%1 ccall (fn [_ x] x) nil) nres)]
> (common rs self g seed)))
> ([self state g acc]
> (let [rs (map #(%1 ccall %2 (fn [_ x] x) nil) nres state)] ; no more
elephant
> (common rs self g seed))))))


STEP 3: where I'm doing it wrong (please skip to STEP 5)

I'm not happy with ignoring an arg (in (fn [_ x] x)) and passing this g
function around. I think it's time to redefine what constitutes the job
of a non recursive enumerator: enumerating, not reducing over enumerated
values.
> (defn a-non-rec-enum-that-does-no-reducing ; returns [state x] where
x is the last enumerated item or returns eoe when at end
> ([eoe] ;first call
> ......)
> ([state eoe] ;subsequent calls
> ......))

Reducing is ereduce's job:
> (defn ereduce [f seed non-rec-enum]
> (let [eoe (Object.)]
> (loop [r (non-rec-enum eoe)
> acc seed]
> (if (identical? eoe r)
> acc
> (recur
> (non-rec-enum (first r) eoe)
> (f acc (second r)))))))

and then emap is simplified to:
> (defn emap [f & nres]
> (let [common (fn [eoe rs]
> (if (some #(identical? eoe %) rs)
> eoe ; still TODO: add proper termination
> [(map first rs) ; compound state
> (apply f (map second rs))]))]
> (fn
> ([eoe]
> (let [rs (map #(%1 eoe) nres)]
> (common eoe rs)))
> ([state eoe]
> (let [rs (map #(%1 %2 eoe) nres state)]
> (common eoe rs))))))

STEP 4: Still in error

I think I'm drifting too far away from original LFEs and still there are
two unattended problems:
* early termination,
* proper termination of enumerators in emap.

Hopefully, they are related: they both call for a way to clean the state
and resources of an enumerator. This time it won't fit using an arity
overloading. Let's turn the NRE into a map:
> {:enumerators/init (fn [eoe] ; returns [state x] or [state eoe]
> .......)
> :enumerators/step (fn [state eoe] ; returns [state x] or [state eoe]
> .......)
> :enumerators/clean (fn [state] ; the new one
> .......)}

(As I'm writing, I realize that I could get rid of all these eoes by
returning only [state].
Right now I'll continue to return [state eoe] since it feels more regular.)

ereduce and emap, one again:
> (defn ereduce [f seed non-rec-enum]
> (let [eoe (Object.)]
> (loop [[state x] ((non-rec-enum :enumerators/init) eoe)
> acc seed]
> (if (identical? eoe x)
> (do ; argh :-)
> ((non-rec-enum :enumerators/clean) state)
> acc)
> (recur
> ((non-rec-enum :enumerators/step) state eoe)
> (f acc x))))))
>
> (defn emap [f & nres]
> (let [inits (map :enumerators/init nres)
> steps (map :enumerators/step nres)
> cleans (map :enumerators/clean nres)
> common (fn [eoe rs]
> (let [state (map first rs)
> vals (map second rs)]
> [state
> (if (some #(identical? eoe %) vals)
> eoe
> (apply f vals))]))
> {:enumerators/init (fn [eoe]
> (let [rs (map #(%1 eoe) inits)]
> (common eoe rs)))
> :enumerators/step (fn [state eoe]
> (let [rs (map #((%1 : %2 eoe) steps state)]
> (common eoe rs)))
> :enumerators/clean (fn [state]
> (dorun (map #(%1 %2) cleans state))}))


It's interesting to notice that, at this point, etake does not need
early termination
> (defn etake [n e]
> {:enumerators/init (fn [eoe]
> (if (pos? n)
> (let [[state x] ((e :enumerators/init) eoe)]
> [[state (dec n)] x])
> [[state 0] eoe]))
> :enumerators/step (fn [[state n] eoe]
> (if (pos? n)
> (let [[state x] ((e :enumerators/step) state eoe)]
> [[state (dec n)] x])
> [[state 0] eoe]))
> :enumerators/clean (fn [[state _]]
> ((e :enumerators/clean) state))

and that multiplexing has been lost while trying to bring a "parallel
map". In fact, I lost good multiplexing potential at STEP 3: since then
ereduce apply the reducing function at each step, which implies that
each step must now return a new value while, before STEP 3, the only
purpose of a "step" was to advance the computation (think about filtered
enumerators: the computation can advance without calling the reducing
function).
I'm afraid that I reversed the inversion of control :-( and this is why
the last etake function was interesting: because it does not involve the
reducing function anymore (see STEP 0).

STEP 5: back on track

Now I define an enumerator as three functions:
> (def a-non-rec-enum
> {:create-state (fn [] ; returns a state value
> ......)
> :step (fn [self state f acc] ; returns a ccall or [result
final-state], it's the regular part of an enumerator as of STEP 2
> ......)
> :destroy-state (fn [state] ; destroy the state (release resources)
> ......)})

The updated ereduce:
> (defn ereduce [f seed non-rec-enum]
> (let [ccall (fn self [& args] (cons self args))
> ccall? #(and (seq? %) (= (first %) ccall))
> state ((non-rec-enum :create-state))
> nre-step (non-rec-enum :step)]
> (loop [args [ccall state f seed]]
> (let [r (apply nre-step args)]
> (if (ccall? r)
> (recur r)
> (let [[result state] r]
> ((non-rec-enum :destroy-state) state)
> result))))))

emap: (I'm pretty sure there's a clever/more functional way to write emap)
> (defn emap [f & nres]
> (let [ccall (fn self [& args] (cons self args))
> ccall? #(and (seq? %) (= (first %) ccall))
> nre-steps (map :step nres)
> marker (Object.)]
> {:create-state (fn [] (map
> (fn [nre] (ccall
> ((nre :create-state))
> (fn [_ x] x)
> marker))
> nres))
> :step (fn [self state g acc]
> (if (every? ccall? state)
> (if (some #(identical? marker (last %)) state)
> (self (map #(if (identical? marker (last %2)) (apply %1 %2) %2)
nre-steps state) g acc) ; advance enumerators that haven't "produced" a
value yet
> (self (map (fn [step [self state f _]] (step self state f marker))
nre-steps state) g (g acc (apply f (map last state))))) ; all
enumerators are ready: reduce and advance them all!
> [acc state]))
> :destroy-state (fn [state]
> (dorun (map #((%1 :destroy-state) (second %2)) nres state)))})); a
"parallel" doseq would be handy :-)

helper function to turn a seqable into an enumerator:
> (defn enum-seq [c]
> {:create-state (fn [] (seq c))
> :step (fn [self state f acc]
> (if-let [fst & rst] state
> (self rst f (f acc fst))
> [acc nil]))
> :destroy-state (fn [state] )})

and another to turn an enumerator into a vector:
> (defn v [e] (ereduce conj [] e))

efilter:
> (defn efilter [pred {:keys [create-state step destroy-state]}]
> {:create-state create-state
> :step (fn [self state f acc]
> (step self state #(if (pred %2) (f %1 %2) %1) acc))
> :destroy-state destroy-state})

a simple io-based enumerator constructor:
> (defn enum-lines [filename]
> {:create-state (fn [] (-> filename java.io.FileReader.
java.io.BufferedReader.))
> :step (fn [self #^java.io.BufferedReader rdr f acc]
> (if-let line (.readLine rdr)
> (self rdr f (f acc line))
> [acc rdr]))
> :destroy-state (fn [#^java.io.BufferedReader rdr]
> (.close rdr))})

Ok, let's test them:
> user=> (def e1 (enum-seq (range 30)))
> #'user/e1
> user=> (def e2 (enum-seq "ABCDEFGHIJ"))
> #'user/e2
> user=> (v (emap vector e1 e2))
> [[0 \A] [1 \B] [2 \C] [3 \D] [4 \E] [5 \F] [6 \G] [7 \H] [8 \I] [9 \J]]
> user=> (v (efilter #(zero? (rem % 2)) e1))
> [0 2 4 6 8 10 12 14 16 18 20 22 24 26 28]
> user=> (v (emap vector (efilter #(zero? (rem % 2)) e1) e2))
> [[0 \A] [2 \B] [4 \C] [6 \D] [8 \E] [10 \F] [12 \G] [14 \H] [16 \I]
[18 \J]]
> user=> (v (enum-lines "/tmp/test-enums"))
> ["This is line one," "this is line two," "this is line three," "this
is line four," "this is line five."]
> user=> (v (efilter #(= 18 (count %)) (enum-lines "/tmp/test-enums")))
> ["this is line four," "this is line five."]

That's all for this post. I haven't adressed early termination yet but I
haven't even made my mind between the reducing function returning an
out-of-band value or throwing a canned Throwable (I don't consider the
option of having the reducing function to return [mustStop value]
because I'd like to be able to pass "normal" fuctions.)

The idea would be something along these lines:
> (def enil? [e]
> (ereduce #(fn [x _] (if x false (stop-enumeration))) true))
and not:
> (def enil? [e]
> (ereduce #(fn [_ _] (stop-and-return x)) true))

I think that one can write a function to transform a basic enumerator
(such as those returned by enum-seq or enum-lines) into an
early-termination-aware one.
About multiplexing, there are some details to sort out but I think that
enumerators returned by emap can be made multiplexable if the
mapped-over enumerators also are multiplexable.

Christophe
--
http://cgrand.net/
Clojure and me http://clj-me.blogspot.com/

Christophe Grand

unread,
Nov 12, 2008, 10:54:12 AM11/12/08
to clo...@googlegroups.com
Wow! All indentation is gone :-(
I'll repost it.

Christophe Grand a écrit :

Reply all
Reply to author
Forward
0 new messages