(ns parallel-test.core
(:require [clojure.java.io :as jio]
[clojure.core.reducers :refer [fold]])
(:gen-class))
(def corpus-file-url "resources/korean.txt")
(def OC (atom nil))
(def MPL 12)
(def each-size 10000)
(defn add-pattern-to-hashmap
[h-map ^String ptn ^Integer ptn-oc]
(let [h-ptn-oc (get h-map ptn)
n-ptn-oc (if (nil? h-ptn-oc)
ptn-oc
(+ h-ptn-oc ptn-oc))]
(assoc h-map ptn n-ptn-oc)))
(defn merge-hash-map
([] (hash-map))
([& hs]
(reduce (fn [l-map r-map]
(reduce (fn [[ptn ptn-oc]]
(add-pattern-to-hashmap l-map ptn ptn-oc))
r-map))
hs)))
(defn cal-line-oc
([] (hash-map))
([h-map ^String line]
(let [line-length (count line)]
(loop [i 0
i-map h-map]
(if (>= i line-length)
i-map
(recur (inc i)
(loop [j 1
j-map i-map]
(let [end-index (+ i j)]
(if (or (> j MPL) (> end-index line-length))
j-map
(recur (inc j)
(add-pattern-to-hashmap j-map (subs line i end-index) 1)))))))))))
(defn parallel-process
[combine-fn reduce-fn input-file]
(with-open [rdr (jio/reader input-file)]
(fold each-size
combine-fn
reduce-fn
(line-seq rdr))))
(defn -main [& args]
(println "start")
(reset! OC (parallel-process merge-hash-map cal-line-oc corpus-file-url))
(println "end"))
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
(ns parallel-test.core
(:require [clojure.core.async :as async]
[iota :as iota])
(:import (java.util HashMap Map$Entry))
(:gen-class))
(def corpus-file-url "resources/korean.txt")
(def OC (atom nil))
(def MPL 12)
(def cpu-core-num 16)
(def corpus-file-vec (iota/vec corpus-file-url))
(def corpus-lines-num (count corpus-file-vec))
(def each-size (-> (/ corpus-lines-num cpu-core-num)
Math/ceil
int))
(defn add-pattern-to-hashmap
[^HashMap h-map ^String ptn ^Integer ptn-oc]
(let [h-ptn-oc (.get h-map ptn)
n-ptn-oc (if (nil? h-ptn-oc)
ptn-oc
(+ h-ptn-oc ptn-oc))]
(.put h-map ptn n-ptn-oc)))
(defn cal-lines-oc
[lines]
(let [r-map (HashMap.)]
(doseq [line lines]
(let [line-length (count line)]
(doseq [i (range line-length)]
(doseq [j (range 1 (inc MPL))
:let [end-index (+ i j)]
:while (<= end-index line-length)]
(let [pattern (subs line i end-index)]
(add-pattern-to-hashmap r-map pattern 1))))))
r-map))
(defn merge-hashmap
[^HashMap l-map ^HashMap r-map]
(println "Merged map size: " [(count l-map) (count r-map)])
(doseq [^Map$Entry entry (.entrySet r-map)]
(add-pattern-to-hashmap l-map (.getKey entry) (.getValue entry)))
l-map)
(defn parallel-cal-oc
[pipeline process-fn input-vec]
(doall
(->> (map list (range) (partition-all each-size input-vec))
(map (fn [[index lines]]
(println (* (inc index) each-size) " lines processing!")
(future (async/>!! pipeline (process-fn lines))))))))
(defn parallel-merge-hashmap
[pipeline batch-num out merge-hashmap]
(async/go-loop [m-count 1]
(if (>= m-count batch-num)
(do
(async/>! out (async/<! pipeline))
(async/close! pipeline))
(let [l-map (async/<! pipeline)
r-map (async/<! pipeline)]
(println "current m-count: " m-count)
(future (async/>!! pipeline (merge-hashmap l-map r-map)))
(recur (inc m-count))))))
(defn -main [& args]
(let [start-time (System/currentTimeMillis)
pipeline (async/chan cpu-core-num)
out (async/chan 1)
batch-num (-> (/ corpus-lines-num each-size)
Math/ceil
int)]
(println "start time: " start-time)
(parallel-cal-oc pipeline cal-lines-oc corpus-file-vec)
(parallel-merge-hashmap pipeline batch-num out merge-hashmap)
(reset! OC (async/<!! out))
(async/close! out)
(let [end-time (System/currentTimeMillis)
elapsed-time (double (/ (- end-time start-time) 60000))
minute (int elapsed-time)
second (* (rem elapsed-time 1) 60)
elapsed-time-str (str "Elapsed time " minute ":" second)]
(println "OC hashmap size: " (count @OC))
(println "end time: " end-time)
(println elapsed-time-str))))