When should I use non-blocking >! / threads and blocking >!! / goroutines with clojure core.async

588 views
Skip to first unread message

Sean Pietz

unread,
Jan 29, 2014, 9:08:58 PM1/29/14
to clo...@googlegroups.com
I'm writing a an ETL process to read event level data from a product database, transform / aggregate it and write to to an analytics data warehouse. I'm using clojure's core.async library to separate these process into concurrently executing components. Here's what the main part of my code looks like right now

        (ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
 [data-staging.map-vecs]
 [data-staging.tables])
(:gen-class))

(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))

;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))

(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from 
    table and ouputting a vector of the rows(maps) 
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))

(defn aggregator []
   "takes output from 'chan-in' and aggregates it by coupon_id, date.
    then adds / drops any fields that are needed / not needed and inputs
    into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))

(defn read-thread []
"reads data from chan out and interts into Analytics DB" 
(while true 
(upsert (async/<!! chan-out))))

(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))

As you can see, I'm putting each os component on to its own thread and using the blocking >!! call on the channels. It feels like using the non-blocking >! calls along with go routines might be better for this use case, especially for the database reads which spend most of their time performing i/o and waiting for new rows in the product db. Is this the case, and if so, what would be the best way to implement it? I'm a little unclear on all the tradeoffs between the two methods and on exactly how to effectively use go routines. Also any other suggestions on how to improve the overall architecture would be much appreciated!

Caspar Hasenclever

unread,
Jan 30, 2014, 1:22:00 PM1/30/14
to clo...@googlegroups.com
I think your case is exactly where not to use go blocks. Stuff in go
blocks is executed on a limited size thread pool so with enough blocking
I/O in there you could in theory slow down async processing.

The win in using <! and >! in go blocks is that they don't block async
threads _while they are waiting for input/output channels_. Once you
have a value from <! anything you do with it is run on one of the async
threads and can therefore block those. If you do this in enough places,
thus blocking enough threads from the async thread pool, you could slow
down the whole thing.

If you already know that you are performing blocking I/O, I would stick
to running those functions with async/thread or just simple future
(since you are not using the output channel returned by async/thread). I
have used just futures in a context similar to yours (piping data in and
out of dbs) and used the channels as just that: pipes (and used go blocks
for CPU-bound functions or for plain routing values through channels).

Of course if you only have a handful of threads as in your example, the
difference won't be that great :)

HTH,

Caspar

P.S. a side note on your code: you might want to terminate your loops
when the channel is closed, rather than using (while true ...). You will
get endless nils from a closed channel, probably not what you want to
keep processing.

Jozef Wagner

unread,
Jan 30, 2014, 1:48:36 PM1/30/14
to clo...@googlegroups.com
go blocks, together with >!, <!, alt!, etc... do not create any new threads and are not run in separate thread. There is no thread pool for go blocks. Code inside go blocks is transformed into state machine, and the state machine 'object' is parked in the corresponding channel. When the complementary operation on the channel is invoked (e.g. put is parked and take is later invoked), the state machine resumes and produces/consumes value. It can all happen in the one thread.

If your code results in handful of long running threads, I would go with threads and >!! <!!. If you need arbitrary number of 'threads' or if they are short lived, go blocks is the way.

JW


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Jozef Wagner

unread,
Jan 30, 2014, 2:40:30 PM1/30/14
to clo...@googlegroups.com
Couple of points:

- If this piece of code is a performance bottleneck, benchmark whether go 'threads' or real threads better suits your needs.
- Handle edge cases, e.g. how your code behave when the channels close, mainly if you have (while true ...) combo.
- async/thread uses growing thread pool, so having 5 long running threads won't hurt it. 
- async/map> and async/pipe may help to produce more clean code.
- It is better to test and reason about the function if it does not use global vars. E.g. make chan-in and chan-out arguments in the aggregator fn rather than using global vars directly.

JW

Mauricio Aldazosa

unread,
Jan 30, 2014, 10:41:15 PM1/30/14
to clo...@googlegroups.com
On Thu, Jan 30, 2014 at 12:48 PM, Jozef Wagner <jozef....@gmail.com> wrote:

go blocks, together with >!, <!, alt!, etc... do not create any new threads and are not run in separate thread. There is no thread pool for go blocks.

I thought that go blocks do run in a thread pool of size 42 + (2 * Num of processors). In the definition of the go macro there is a dispatch/run whose docstring says it uses a thread pool.

Am I misunderstanding something?

Timothy Baldridge

unread,
Jan 30, 2014, 10:51:11 PM1/30/14
to clo...@googlegroups.com
To quote Jozef "it can happen all in one thread". This is somewhat true, there are some rare situations where this can happen, but it is fairly rare. 

Many times putting a value into a channel will mean that the callback on the other end of the channel needs to be dispatched. In that case Mauricio is correct, these callbacks are executed in a fixed sized thread pool. 

Timothy


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
“One of the main causes of the fall of the Roman Empire was that–lacking zero–they had no way to indicate successful termination of their C programs.”
(Robert Firth)

Jozef Wagner

unread,
Jan 31, 2014, 4:39:03 AM1/31/14
to clo...@googlegroups.com
Thank you for the clarification. If I understand it correctly, these callbacks in case of go 'threads' resume the parked state machine, so if there is a blocking IO inside go block, it ends up waiting in one of these callbacks, thus tying up a thread from a fixed sized pool until IO operation unblocks it.

JW
Reply all
Reply to author
Forward
0 new messages