Question about Clojure parallelization

296 Aufrufe
Direkt zur ersten ungelesenen Nachricht

darren...@gmail.com

ungelesen,
12.09.2017, 12:43:5812.09.17
an Clojure
Hi, 

I am a researcher of Natural Language Processing.
My team want to know how well does Clojure parallelize and how much time is reduced compared by Java single thread version.

The problem we want to solve is, 
there is a big corpus file (just now 500MB).
Reading sentences line by line, find all patterns and their occurrence count on length 1 through 12.

It is a very simple problem and It doesn’t care of order of processing.
We want to make just a big hash-map. (Key is a pattern string, Value is a occurrence count.)
Ex) { “father” 10000000 “mother” 10000000 … }

Comparing performance between Java and Clojure, if Clojure version is better than Java, 
then we’ll change our code base to Clojure, if not, we cannot help staying Java. 

Anyway my first prototype is very very slow. I’m a novice.  :( 

Please give me some advices.
Thanks.  

(ns parallel-test.core
(:require [clojure.java.io :as jio]
[clojure.core.reducers :refer [fold]])
(:gen-class))

(def corpus-file-url "resources/korean.txt")
(def OC (atom nil))
(def MPL 12)
(def each-size 10000)

(defn add-pattern-to-hashmap
[h-map ^String ptn ^Integer ptn-oc]
(let [h-ptn-oc (get h-map ptn)
n-ptn-oc (if (nil? h-ptn-oc)
ptn-oc
(+ h-ptn-oc ptn-oc))]
(assoc h-map ptn n-ptn-oc)))

(defn merge-hash-map
([] (hash-map))
([& hs]
(reduce (fn [l-map r-map]
(reduce (fn [[ptn ptn-oc]]
(add-pattern-to-hashmap l-map ptn ptn-oc))
r-map))
hs)))

(defn cal-line-oc
([] (hash-map))
([h-map ^String line]
(let [line-length (count line)]
(loop [i 0
i-map h-map]
(if (>= i line-length)
i-map
(recur (inc i)
(loop [j 1
j-map i-map]
(let [end-index (+ i j)]
(if (or (> j MPL) (> end-index line-length))
j-map
(recur (inc j)
(add-pattern-to-hashmap j-map (subs line i end-index) 1)))))))))))

(defn parallel-process
[combine-fn reduce-fn input-file]
(with-open [rdr (jio/reader input-file)]
(fold each-size
combine-fn
reduce-fn
(line-seq rdr))))

(defn -main [& args]
(println "start")
(reset! OC (parallel-process merge-hash-map cal-line-oc corpus-file-url))
(println "end"))

Didier

ungelesen,
12.09.2017, 17:16:2012.09.17
an Clojure
If performance is the concern, you won't be able to beat Java with Clojure. That said, it should be possible to match Java's performance.

au...@cock.li

ungelesen,
12.09.2017, 19:05:5412.09.17
an clo...@googlegroups.com
https://github.com/thebusby/iota could be useful in your case.


Iota is a Clojure library for handling large text files in memory, and offers the following benefits;
* Tuned for Clojure's reducers, letting you reduce over large files quickly.
* Uses Java NIO's mmap() for rapid IO and handling files larger than available memory.
* Efficiently stores data as it is represented in the file, and only converts to java.lang.String when necessary.
* Offers efficient indexing and caching that emulates Clojure's native vector and seq data structures.
* Adjustable buffer sizes for IO and caching, enabling tuning for specific data sets.

James Reeves

ungelesen,
12.09.2017, 22:04:0412.09.17
an clo...@googlegroups.com
As you suspect, your Clojure code isn't very performant.

First, you're doing lots of updates to an immutable map, which isn't going to be efficient. Clojure allows immutable data structures to be changed, temporarily, into mutable ones using the transient function. Alternatively, sometimes it's worth falling back to mutable Java structures like Hashtables.

You can work out the frequencies of the patterns a little more concisely with:

  (defn pattern-frequencies [line]
    (->> (range 12)
         (mapcat #(partition (inc %) 1 line))
         (map str/join)
         (frequencies))

And then combine them with:

  (defn merge-frequencies [freqs]
    (merge-with + freqs))

There's plenty of room for optimisation there, but making more use of Clojure's core functions is an easy way to make things quicker.


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
James Reeves

Ghadi Shayban

ungelesen,
13.09.2017, 00:20:3613.09.17
an Clojure
One thing slowing you down is that your function "parallel-process" is calling fold on a line-seq, which is not a foldable source, so you won't get any parallelism.  It devolves to a sequential reduce.

As an alternative, consider partitioning the lines into batches of a few thousand, then pipelining the calculation over batch, then calling (merge-with +) on all the intermediate results. clojure.core.async/pipeline is a useful function for this.

Na eim

ungelesen,
13.09.2017, 01:30:5013.09.17
an Clojure

darren...@gmail.com

ungelesen,
18.09.2017, 10:03:0418.09.17
an Clojure
Thank for all of your advices.

1. I used iota library for convenience.
2. Even though I updated Clojure hash-map by using the transient function, it was very slow. Because the final HashMap size is about 300,000,000.  So I had no choice but to using mutable Java data structure, HashMap.
3. I could calculate occurrence counts in parallel, and merge the pairs of HashMaps in parallel by using core.async.

The results are as follows :
Java version: 9 minutes 30 seconds (Single core)
Clojure version: 3 minutes 45 seconds (20 cores)

We saw a good chance. 
So our team will continue to try prototyping with Clojure.

This was sample code we were testing. 

(ns parallel-test.core
(:require [clojure.core.async :as async]
[iota :as iota])
(:import (java.util HashMap Map$Entry))

(:gen-class))

(def corpus-file-url "resources/korean.txt")
(def OC (atom nil))
(def MPL 12)

(def cpu-core-num 16)
(def corpus-file-vec (iota/vec corpus-file-url))
(def corpus-lines-num (count corpus-file-vec))
(def each-size (-> (/ corpus-lines-num cpu-core-num)
Math/ceil
int))

(defn add-pattern-to-hashmap
[^HashMap h-map ^String ptn ^Integer ptn-oc]
(let [h-ptn-oc (.get h-map ptn)

n-ptn-oc (if (nil? h-ptn-oc)
ptn-oc
(+ h-ptn-oc ptn-oc))]
    (.put h-map ptn n-ptn-oc)))

(defn cal-lines-oc
[lines]
(let [r-map (HashMap.)]
(doseq [line lines]
(let [line-length (count line)]
(doseq [i (range line-length)]
(doseq [j (range 1 (inc MPL))
:let [end-index (+ i j)]
:while (<= end-index line-length)]
(let [pattern (subs line i end-index)]
(add-pattern-to-hashmap r-map pattern 1))))))
r-map))

(defn merge-hashmap
[^HashMap l-map ^HashMap r-map]
(println "Merged map size: " [(count l-map) (count r-map)])
(doseq [^Map$Entry entry (.entrySet r-map)]
(add-pattern-to-hashmap l-map (.getKey entry) (.getValue entry)))
l-map)

(defn parallel-cal-oc
[pipeline process-fn input-vec]
(doall
(->> (map list (range) (partition-all each-size input-vec))
(map (fn [[index lines]]
(println (* (inc index) each-size) " lines processing!")
(future (async/>!! pipeline (process-fn lines))))))))

(defn parallel-merge-hashmap
[pipeline batch-num out merge-hashmap]
(async/go-loop [m-count 1]
(if (>= m-count batch-num)
(do
(async/>! out (async/<! pipeline))
(async/close! pipeline))
(let [l-map (async/<! pipeline)
r-map (async/<! pipeline)]
(println "current m-count: " m-count)
(future (async/>!! pipeline (merge-hashmap l-map r-map)))
(recur (inc m-count))))))

(defn -main [& args]
(let [start-time (System/currentTimeMillis)
pipeline (async/chan cpu-core-num)
out (async/chan 1)
batch-num (-> (/ corpus-lines-num each-size)
Math/ceil
int)]

(println "start time: " start-time)
(parallel-cal-oc pipeline cal-lines-oc corpus-file-vec)
(parallel-merge-hashmap pipeline batch-num out merge-hashmap)
(reset! OC (async/<!! out))
(async/close! out)
(let [end-time (System/currentTimeMillis)
elapsed-time (double (/ (- end-time start-time) 60000))
minute (int elapsed-time)
second (* (rem elapsed-time 1) 60)
elapsed-time-str (str "Elapsed time " minute ":" second)]
(println "OC hashmap size: " (count @OC))
(println "end time: " end-time)
(println elapsed-time-str))))

2017년 9월 13일 수요일 오전 1시 43분 58초 UTC+9, darren...@gmail.com 님의 말:
Allen antworten
Antwort an Autor
Weiterleiten
0 neue Nachrichten