core.async consumer + producer working by chunk?

304 views
Skip to first unread message

Rob Nikander

unread,
Jan 5, 2018, 1:44:18 PM1/5/18
to Clojure
Hi,

I’m wondering if there is a core.async design idiom for this situation...

- A buffered channel 
- One producer feeding it 
- A bunch of consumers pulling from it.
- Producer should wake up and fill the channel only when it’s empty. In other words, the producer should work in chunks.

My first idea is to have two channels. The second will be used by consumers to signal the producer that the primary channel is empty. But I'm wondering if there is a better way.

The motive for this is that the producer is doing a DB query that is more efficient in bulk. `select ... limit 50` rather than `select ... limit 1` 50 times.

Rob

Brian J. Rubinton

unread,
Jan 5, 2018, 2:03:00 PM1/5/18
to clo...@googlegroups.com
Hi Rob,

What is the buffered channel’s buffer used for? If that’s set to 1 and the channel’s transducer is `(mapcat identity)` then the producer should be able to continuously put chunks of work onto the channel with the puts only completing when the previous chunk is completely consumed.

That would make the producer block/park until the previous chunk is consumed.

For your use case I think an interesting result of this approach (maybe a downside?) is initially 2 db queries would be performed, 1 whose result makes it onto the channel while 2’s result will be parked until the previous query’s result is consumed.

Brian

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Rob Nikander

unread,
Jan 5, 2018, 2:10:15 PM1/5/18
to Clojure


On Friday, January 5, 2018 at 2:03:00 PM UTC-5, Brian J. Rubinton wrote:

What is the buffered channel’s buffer used for? If that’s set to 1 and the channel’s transducer is `(mapcat identity)` then the producer should be able to continuously put chunks of work onto the channel with the puts only completing when the previous chunk is completely consumed.

The buffer is sort of a work queue, used to break up and distribute one "chunk" to a pool of consumers. I was imaging the buffer would be size 50. So the producer would grab 50 rows (one "chunk") and put them all in the channel. Consumers would pick them out one by one.

I'm not familiar with the transducer concept so I need to go read a bit before I can understand your point about the channel's transducer being `(mapcat identity)`.

 

Brian J. Rubinton

unread,
Jan 5, 2018, 2:40:14 PM1/5/18
to clo...@googlegroups.com
The `mapcat` transducer takes a collection as its input and outputs each of its items individually. This example might be helpful:

user> (use ‘[clojure.core.async])
nil
user> (def work-queue (chan 1 (mapcat identity)))
#’user/work-queue
user> (offer! work-queue (range 50))
true
user> (<!! work-queue)
0
user> (<!! work-queue)
1
user> (offer! work-queue (range 50))
nil
user> (dotimes [_ 48] (<!! work-queue))
nil
user> (offer! work-queue (range 50))
true
user> (<!! work-queue)
0

The work-queue channel has a fixed buffer size of 1. A collection (range 50) is put on the channel. While consumers can take items off the channel — note the individual contents of (range 50) are returned — a producer cannot put another value onto the channel until the entire initial collection is consumed. Once that channel is exhausted -- (0…49) are taken from the channel -- it’s buffer is empty and ready to accept another value.

I’m not very confident I use core.async idiomatically — I probably played too much The Incredible Machine growing up — but this looks OK :)

Rob Nikander

unread,
Jan 5, 2018, 3:51:54 PM1/5/18
to Clojure
Thanks for the explanation! This is very close to what I want. I see some confusing behavior though. See below.


On Friday, January 5, 2018 at 2:40:14 PM UTC-5, Brian J. Rubinton wrote:

The work-queue channel has a fixed buffer size of 1. A collection (range 50) is put on the channel. While consumers can take items off the channel — note the individual contents of (range 50) are returned — a producer cannot put another value onto the channel until the entire initial collection is consumed. 

I tried your example code works for me, but then I tried using `>!!` instead of `offer!` and it behaves differently.

    user> (def c (chan 1 (mapcat identity)))
    => #'user/c
    user> (do 
       (async/thread (async/>!! c [1 2 3]) (println "put #1"))
       (async/thread (async/>!! c [4 5 6]) (println "put #2"))
       (async/thread (async/>!! c [7 8 9]) (println "put #3")))
    put #1

As expected it prints "put #1" right away, and other threads are blocked. But...
   
    user> (poll! c)
    => 1
    put #2
    user> (poll! c)
    => 2
    put #3
    
As soon as your read anything from it, other puts succeed. Shouldn't they be blocked?

Rob 

Moritz Ulrich

unread,
Jan 5, 2018, 4:00:25 PM1/5/18
to Rob Nikander, Clojure
You have a channel with a buffer-size of one. You clear the buffer by
taking one item from it, making room for another one. Therefore the put
succeeds. Try just `(async/chan nil xform)` to create a channel without
a backing buffer (a rendezvouz channel) where puts only succeed if
there's a matching consumer.

Rob Nikander

unread,
Jan 5, 2018, 4:07:16 PM1/5/18
to Clojure


On Friday, January 5, 2018 at 4:00:25 PM UTC-5, Moritz Ulrich wrote:

You have a channel with a buffer-size of one. You clear the buffer by
taking one item from it, making room for another one. Therefore the put
succeeds. Try just `(async/chan nil xform)` to create a channel without
a backing buffer (a rendezvouz channel) where puts only succeed if
there's a matching consumer.

Then why does Brian's code work the way it does? See how he takes 2 things off the channel, and offer! still fails and returns nil. If offer! is returning nil, shouldn't >!! be blocking?



Brian J. Rubinton

unread,
Jan 5, 2018, 4:47:16 PM1/5/18
to clo...@googlegroups.com
I don’t know; I don’t fully understand the implementation differences of >!! and offer!. The behavior of offer! makes me think the buffer is not empty until all the outputs of the transducer are consumed, but the behavior of >!! makes me think otherwise.

Moritz - is the buffer cleared if:
- it’s size is 1
- you’ve put 1 item onto the channel 
- that 1 item is transformed into 3 by the channel’s transducer
- and only 1 of the 3 items are taken from the channel?

Per the docstrings and the behavior it’s both not possible to put another value onto the channel immediately (so offer! returns nil) yet buffer space is available after only 1 of the 3 values on the channel are taken (so >!! doesn't block).

This seems like a weird corner case made possible by transducers. Maybe the collection put into the buffer is removed from the buffer and passed to the transducer on the first take, so the buffer is empty and that’s all >!! cares about, yet there are still values to be taken from the channel (from the transducer — not the buffer) and somehow that affects offer!’s behavior.


Gary Verhaegen

unread,
Jan 5, 2018, 8:01:51 PM1/5/18
to clo...@googlegroups.com
What about simply having the producer put items one by one on the channel?

(ns t.core
  (:require [clojure.core.async :as async]))

(defn ap
  "atomic print"
  [m]
  (print (str (pr-str m) "\n")))

(defn produce-next-batch
  [s]
  (let [m (+ s 10)]
    [(range s m) m]))

(defn chunked-producer
  [init-state]
  (let [result-chan (async/chan)]
    (async/go
      (loop [[batch cursor] (produce-next-batch init-state)]
        (ap {:produced batch})
        (doseq [elem batch]
          (async/>! result-chan elem))
        (recur (produce-next-batch cursor))))
    result-chan))

(defn run-consumer
  [ch id n]
  (async/go
    (dotimes [_ n]
      (ap {:id id :received (async/<! ch)})
      (async/<! (async/timeout 10)))))

(defn run
  []
  (let [c (chunked-producer 0)
        c1 (run-consumer c 1 10)
        c2 (run-consumer c 2 10)
        c3 (run-consumer c 3 10)]
    (->> [c1 c2 c3]
         (mapv async/<!!))
    (flush)))
Here produce-next-batch has been deliberately written to evoke the idea that you have some sort of state or cursor that lets you produce the next batch. Real code would obviously need to account for exceptions, handle channel closing, etc., but hopefully this illustrate the idea.


Brian J. Rubinton

unread,
Jan 6, 2018, 8:22:34 AM1/6/18
to clo...@googlegroups.com
Rob - I’d go with Gary's approach, which essentially moves the splitting up of the chunk of results from the core.async channel’s transducer to the producing function. You can do that using a channel with a fixed buffer of 50 and >!!. As long as the next db query is blocked until each of the results from the previous query are put onto the channel it’ll work as you want.

I think the behavior in our examples differ because the blocking puts will complete whenever there is a take and the buffer is not full, ignoring whether the transducer is still outputting values. This bug may be relevant, though there it arises in a less common scenario (fixed buffer of size 0, which is now disallowed) https://dev.clojure.org/jira/browse/ASYNC-140

Rob Nikander

unread,
Jan 6, 2018, 11:27:20 AM1/6/18
to Clojure

On Jan 5, 2018, at 8:01 PM, Gary Verhaegen <gary.ve...@gmail.com> wrote:
What about simply having the producer put items one by one on the channel?

I will do that. My current producer is doing too many other things, but if I break it up into separate threads or go blocks for each work queue, then that should work. Thank you.
 

On Saturday, January 6, 2018 at 8:22:34 AM UTC-5, Brian J. Rubinton wrote:
I think the behavior in our examples differ because the blocking puts will complete whenever there is a take and the buffer is not full, ignoring whether the transducer is still outputting values. This bug may be relevant, though there it arises in a less common scenario (fixed buffer of size 0, which is now disallowed) https://dev.clojure.org/jira/browse/ASYNC-140

So, should I report this as a bug? If you have channel with a buffer and a (mapcat identity) transducer, the number of items in the channel can grow without bound. I thought channels were supposed to prevent that.

 

Alex Miller

unread,
Jan 6, 2018, 11:43:03 AM1/6/18
to Clojure
This is by design. Because the transducer executes in the channel, it's possible for it to temporarily expand the buffer beyond its normal size. This is certainly something to be aware of when using an expanding transducer (mapcat is probably the most common case).

Brian J. Rubinton

unread,
Jan 6, 2018, 11:56:06 AM1/6/18
to clo...@googlegroups.com
Alex - it makes sense to me that the buffer temporarily expands beyond its normal size with the content of the expanding transducer. What does not make sense to me is the buffer also accepts puts even though its buffer is full.

Why would the take! process puts when the channel's buffer is full?

I was experimenting with the implementation and I think the patch in this gist leads to more intuitive behavior — though it’s possible it breaks something else. https://gist.github.com/brianru/d30f0319e7a14d875a80762937cccb9c



Brian J. Rubinton

unread,
Jan 6, 2018, 11:58:42 AM1/6/18
to clo...@googlegroups.com
Typo — I meant to say the channel executes puts during a take! even though the buffer is full before executing the puts. This is clearer in code (please see the gist).

Alex Miller

unread,
Jan 6, 2018, 12:10:28 PM1/6/18
to Clojure


On Saturday, January 6, 2018 at 10:56:06 AM UTC-6, Brian J. Rubinton wrote:
Alex - it makes sense to me that the buffer temporarily expands beyond its normal size with the content of the expanding transducer. What does not make sense to me is the buffer also accepts puts even though its buffer is full.

Why would the take! process puts when the channel's buffer is full?

I was experimenting with the implementation and I think the patch in this gist leads to more intuitive behavior — though it’s possible it breaks something else. https://gist.github.com/brianru/d30f0319e7a14d875a80762937cccb9c

Possibly, feel free to file a ticket and I will take a closer look.
 

Brian J. Rubinton

unread,
Jan 6, 2018, 12:11:48 PM1/6/18
to clo...@googlegroups.com

Thanks! I will. Just signed the CA.


--

Brian J. Rubinton

unread,
Jan 6, 2018, 12:22:13 PM1/6/18
to clo...@googlegroups.com
Reply all
Reply to author
Forward
0 new messages