partition-all in clojure.core.asyncn channel does not work

99 views
Skip to first unread message

Jacek Grzebyta

unread,
Dec 6, 2017, 6:24:02 AM12/6/17
to clo...@googlegroups.com
Hi,

I have to populate a triple store with a big number of data (~38k records x 12) and there is a deadly narrow bottleneck - IO operations speed. To fix it I did:
    1. To avoid threads overflow I put all compute results into channel.
    2. Loading data in chunks is better than single transaction for single record

I tried to do by creating channel with poputale-all traversal but it seems doesn't work properly. In the following mock example it works when the chunk size is  equal the data vector (i.e. 6): "value:  [of made is fruit soup Berry]" - for now I do not care the order.

(let [q (a/chan 500 (partition-all 6))
      in ["Berry" "soup" "is" "made" "of" "fruit"]]
  (a/go-loop [j (a/<! q)]
    (when j
      (println "value: " j)
      (recur (a/<! q))))
  (doseq [itm in]
    (a/go (a/>! q itm))))


I cannot see any problem. How can I solve it? In the following example chunk size should be max 6? I expected partition-all will work the same way as itself:

(partition-all 5 ["Berry" "soup" "is" "made" "of" "fruit"]) ==>
(("Berry" "soup" "is" "made" "of") ("fruit"))

Thanks a lot,
Jacek

Ray Miller

unread,
Dec 6, 2017, 6:51:01 AM12/6/17
to clo...@googlegroups.com
I think you just need to close the channel when you've finished populating it:

(let [q (a/chan 500 (partition-all 5))
      in ["Berry" "soup" "is" "made" "of" "fruit"]]
  (a/go-loop [j (a/<! q)]
    (when j
      (println "value: " j)
      (recur (a/<! q))))
  (a/go
    (doseq [itm in]
      (a/>! q itm))
    (a/close! q)))
 

Jacek Grzebyta

unread,
Dec 6, 2017, 7:22:19 AM12/6/17
to clo...@googlegroups.com
Thanks a lot.

I found you changed the finish slightly - you close q within go body. Where should I put close! in case if I load a channel in many places within several methods? I afraid the go body is done

E.g.:

(defn main-method [db-api]
  (let [main-queue (chan 20000) ]
    (sink-method main-queue db-api)  ;; population db within go-loop body
    (loading-method-1 main-queue & rest)
    (loading-method-n main-queue & rest)))

And methods pattern is like:

 (def loading-method-.. [ q ....]
   (doseq
         (loading-sub-method q)
         (map
                #(doseq

                           (go
                                  >! q (processing i j k l m))
                        ) .. ) ... ))


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+unsubscribe@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jacek Grzebyta

unread,
Dec 6, 2017, 7:31:29 AM12/6/17
to clo...@googlegroups.com
Sorry. I put close at he and of the main method but I afraid the main thread will reach close method before all the data will be put to the queue.
Is any way to check a state of the queue? If I handle channels returned by all *-sub-methods with go inside I could check state if was finished?

Regards,
J


Jacek Grzebyta

unread,
Dec 6, 2017, 8:30:55 AM12/6/17
to clo...@googlegroups.com
So I found how to manage all sub-*-methods (    <!! waits for theirs finish) but still I do not know where should I put close!. If I do at the end of the main method than the main channel is closed before having all data and db stays empty.

J.

Jacek Grzebyta

unread,
Dec 6, 2017, 8:50:39 AM12/6/17
to clo...@googlegroups.com
Finally I will add atomically changed flag `isFinished`. And separate throw to close if the flag is true.
Reply all
Reply to author
Forward
0 new messages