Don't understand why this core.async code fails

202 views
Skip to first unread message

vemv

unread,
Jul 8, 2013, 2:05:37 AM7/8/13
to clo...@googlegroups.com
As you can read here: http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io doing blocking IO in a go block should be avoided. So I was thinking that an alternative to non-blocking IO APIs is using agents and channels.

The following sample program intends to illustrate the technique:

(require '[clojure.core.async :as async :refer :all])
(import '[java.util.concurrent Executors])

(spit "bar" (apply str (shuffle (range 100))))

(def slurper (agent nil))

(def spitter (agent nil))

(def max-concurrency 1)

;; when a go block wants to slurp a file, it communicates it by writing to thins channel
(def slurping-request (chan max-concurrency))

;; where requested slurped content gets served
(def slurps (chan max-concurrency))

(def rw-pool (Executors/newFixedThreadPool max-concurrency))

(def serving
  (future
    (while (<!! slurping-request)
      (send-via rw-pool slurper (fn [_] (>!! slurps (seq (slurp "bar"))) _)))))

(go (while true
      (>! slurping-request :req)
      (let [old (<! slurps)
            new (apply str (shuffle old))]
        (send-via rw-pool spitter (fn [_] (spit "bar" new))))))


It is supposed to constantly modify a file, until you (close! slurping-request). However, somehow nil-related issues crept in:

  • With max-concurrency set to 1, at times an empty string gets written to the file.
  • With max-concurrency set to 4, the spitter agent fails.

What is going on? Maybe files need locking? Or am I misunderstanding some aspect of core.async itself?

Timothy Baldridge

unread,
Jul 8, 2013, 8:18:38 AM7/8/13
to clo...@googlegroups.com
A few thoughts on ways to improve this code:

Using agents and go blocks in the same code is a bit odd. If you need a queue plus a process,  just do something like this:

(defn faux-agent [c]
  (go (loop []
           (when-let [msg (<! c)]
                 ...do something...
                 (recur)))))

This go block will automatically terminate once the input channel is closed. Now you have code that is almost like an agent, but works well with channels (including back-pressure). 

Also, instead of having a public req channel and a response channel, consider using higher-order channels. Yes, you can send channels via channels. So instead of having a request look like this:

(>! req-chan :req)

do something like this:

(>! req-chan [file-name my-response-channel])
(<! my-response-channel)

This allows many processes to make requests and your code will end up a little easier to reason about. 

Just some thoughts. 

Timothy Baldridge


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

vemv

unread,
Jul 8, 2013, 12:09:54 PM7/8/13
to clo...@googlegroups.com
Took me a while to get the idea but higher-order channels are brilliant - that way one ensures a given reply was actually targeted at one's request. Thanks for the suggestion!

Faux-agents would have the limitation of not being to (reasonably) perform blocking I/O - which is the point of my sample program.

An improved version:

(require '[clojure.core.async :as async :refer :all])
(import '[java.util.concurrent Executors]
        '[java.util.concurrent.locks ReentrantReadWriteLock])


(spit "bar" (apply str (shuffle (range 100))))

(def max-concurrency 4)

(def requests (chan max-concurrency))

;; only ensures exclusive access within the app - not across the OS
;; too lazy to use a FileLock :)
(def lock (ReentrantReadWriteLock.))

(def rw-pool (Executors/newFixedThreadPool max-concurrency))

;; one sends actions to anonymous one-off multiple agents
;; (as opposed to a single, named one), in order to increase concurrency

(dotimes [_ max-concurrency]
  (go (loop []
        (when-let [request (<! requests)]
          (send-via rw-pool (agent nil) (fn [_]
                                          (try
                                            (-> lock .readLock .lock)
                                            (>!! request (seq (slurp "bar")))
                                            (finally
                                              (-> lock .readLock .unlock)))))
          (recur)))))

(go (loop []
      (let [request (chan)
            _ (>! requests request)
            response (<! request)]
        (send-via rw-pool (agent nil) (fn [_]
                                        (try
                                          (-> lock .writeLock .lock)
                                          (spit "bar" (apply str (shuffle response)))
                                          (finally
                                            (-> lock .writeLock .unlock)))))
        (recur))))


As commented, locking is only partially useful (that's why opening the file might display an empty string at times). Also, don't know what would be the best way to indicate termination to the second go block...

Timothy Baldridge

unread,
Jul 8, 2013, 12:23:17 PM7/8/13
to clo...@googlegroups.com
Eh, you're right, there is however, a blocking version of go, called thread:

So perhaps something like this?

(thread 
  (while true
    (let [[filename response-chan] (<!! c)]
       (>!! response-chan (slurp filename)))))

Or perhaps I'm missing something. 

Timothy


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Víctor M. Valenzuela

unread,
Jul 8, 2013, 3:34:37 PM7/8/13
to clo...@googlegroups.com
The downside of #'thread (just as with send-off) is that it can use arbitrarily many threads for performing IO ops. And while some IO tasks benefit from performing them from multiple threads (for fairness, perf), others don't (e.g. writing to a log, AFAICT). And in no case one wants to spawn a thousand threads...

That's why I used send-via: arbitrarily many go blocks can request IO, but the resources dedicated to satisfy the demand will be bounded.

Surely #'thread is a good default for channel work, but as I see it, agents add interesting, finer-grained capabilities as well.

You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/C5oE09GJzo8/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages