core.async count in a channel

1,622 views
Skip to first unread message

Paul Viola

unread,
Jan 16, 2014, 6:29:16 PM1/16/14
to clo...@googlegroups.com

Still finding my way around Clojure... sorry if this is trivial.

I am using core.async to implement a queue for a collection of workers that pull jobs and complete them (there may be multiple producers as well).

All looks great so far.   Except when it comes time to close down the pipe (and drain if possible).

I don't see a good way to get the number of items in a channel.  I could construct the fixedbuffer explicitly and then keep a hold on it...  but this feels non-idiomatic (the buffer is mutated at a distance by operations on the channel).

Why not add a count operation to the channel??  It also seems useful for getting status information about the overall process (what is the average depth??).

Happy to be educated.

Thanks,
Paul

t x

unread,
Jan 16, 2014, 6:40:20 PM1/16/14
to clo...@googlegroups.com
I second the request to add a "count" field to the channel protocol.

I also agree that having to keep track of the buffer just to get the count is hacky-ish.


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Meikel Brandmeyer (kotarak)

unread,
Jan 17, 2014, 3:44:42 AM1/17/14
to clo...@googlegroups.com
Hi,

the average channel has no queue. Processes freeze until the peer process arrives. So count doesn't make sense for the typical channel. If you want to implement a queue, you could create a process which accepts values on end and distributes to the other. Then you can keep also track of any stats you like independent of the underlying channel implementation.

(defn queue-process
  [input output stats]
  (async/go
    ; Wait for the first input.
    (loop [v (async/<! input)
           q clojure.lang.PersistentQueue/EMPTY]
      (let [[val-to-q ch] (async/alts! [input [output v]])]
        (swap! stats update-stats q)
        (cond
          ; Read a value from input.
          val-to-q (recur v (conj q val-to-q))

          ; Input channel is closed. => Empty queue.
          (identical? ch input) (do
                                  (doseq [v (cons v q)]
                                    (async/>! output v))
                                  (async/close! output))

          ; Write happened, and there is more in the queue.
          (pos? (count q)) (recur (peek q) (pop q))

          ; Write happened, and queue is empty. Start over.
          :else (recur (async/<! input) q))))))


With something like that, you can implement any queueing strategy (bounded, priority, etc.) you like with any stats you want to collect without polluting the general channel interface.

Disclaimer: I haven't used core.async much, yet. So there might be glitches in the above, but you should get the idea. Also, it might be a bad idea altogether.

Meikel

Meikel Brandmeyer (kotarak)

unread,
Jan 17, 2014, 3:49:34 AM1/17/14
to clo...@googlegroups.com
Bleh when you see the bug as you hit "Send"....

(defn queue-process
  [input output stats]
  (async/go
    ; Wait for the first input.
    (loop [v (async/<! input)
           q clojure.lang.PersistentQueue/EMPTY]
      (if v

        (let [[val-to-q ch] (async/alts! [input [output v]])]
          (swap! stats update-stats-as-you-see-fit q)

          (cond
            ; Read a value from input.
            val-to-q (recur v (conj q val-to-q))

            ; Input channel is closed. => Empty queue.
            (identical? ch input) (recur nil (cons v q))


            ; Write happened, and there is more in the queue.
            (pos? (count q)) (recur (peek q) (pop q))

            ; Write happened, and queue is empty. Start over.
            :else (recur (async/<! input) q)))
        (do
          (doseq [v q] (async/>! output v))
          (async/close! output))))))


Meikel Brandmeyer (kotarak)

unread,
Jan 17, 2014, 5:10:31 AM1/17/14
to clo...@googlegroups.com
Hi again,

and some more golfing by Christophe:

(defn queue-process-uncontrolled
[input output stats]
(async/go
(loop [q clojure.lang.PersistentQueue/EMPTY]
(let [[val-to-q ch] (async/alts!
(if-let [v (peek q)]
[input [output v]]
[input]))]
(swap! stats update-stats-as-you-see-fit q)
(cond
; Read a value from input.
val-to-q (recur (conj q val-to-q))
; Input channel is closed. => drain queue.
(identical? ch input) (doseq [v q] (async/>! output v))
 
; Write happened.
:else (recur (pop q)))))
 
(defn queue-process-controlled
[input stats]
(let [output (async/chan)
process (queue-process-uncontrolled input output stats)]
(async/go
(<! process)
(async/close! output))
output))

Plus an improvement for the closing of the output channel. queue-process-uncontrolled is not necessarily the master of the channel.

Meikel

t x

unread,
Jan 17, 2014, 5:54:47 AM1/17/14
to clo...@googlegroups.com
@Meikel: I am now convinced that you are right. It's clear to me that I completely underestimated the power / flexibility of the Channel abastraction.

@World: I now retract my second-ing of "adding count to Protocol of Channel"


Paul Viola

unread,
Jan 18, 2014, 10:32:22 PM1/18/14
to clo...@googlegroups.com
@Meikel, I like your solution.  Its key advantage is that you have enable arbitrary functionality in the "monitoring" of a channel.  Since I am new to Clojure, and I bet others are too, I was subconsciously thinking about creating a new kind of channel (monitored channel).  This solution does something similar, by defining a new output channel that is a copy of the previous channel while enabling monitoring.  You've done this without a new type of class.

I particularly like this better than simply adding "count" because it solves the problem of *when* to call count for monitoring.  Your code calls count iff count changes.

Regardless of implementation, a monitored channel is a good thing.  Not all channels have zero queue.  

Thanks,
Paul

Aaron France

unread,
Jan 21, 2014, 3:43:09 AM1/21/14
to clo...@googlegroups.com
Hi,

Whilst I am pretty new to clojure. I am not to Go. The counting of items in a channel is usually regarded as an error and a race condition causing idea.

Since channels yield nil when they are devoid of items, surely this is enough to know when the channel is empty?

Aaron

Moritz Ulrich

unread,
Jan 21, 2014, 8:09:13 AM1/21/14
to clojure
On Tue, Jan 21, 2014 at 9:43 AM, Aaron France <aaron.l...@gmail.com> wrote:
> Since channels yield nil when they are devoid of items, surely this is enough to know when the channel is empty?

That's not correct. Take-Operations block on empty channels. They
yield nil when they're closed. You could add a timeout to the take
operation to see if no item arrived in a specific time.

Aaron France

unread,
Jan 21, 2014, 8:11:29 AM1/21/14
to clo...@googlegroups.com
Much appreciated for the clarification. It's the same in Go.

I can imagine this pattern (take on a possibly closed channel being
useful) being useful but I'm not convinced knowing the count of channel
is a safe thing to know/care about.

My $0.02, perhaps Clojure does this differently.

Paul Viola

unread,
Jan 21, 2014, 11:56:56 AM1/21/14
to clo...@googlegroups.com
I think this is all well and good for a particular use of channel.

So perhaps I am misusing channels??  

To repeat: in one case I have workers pulling from a channel of real work. For various reasons this channel might get filled rather deeply. In this case I would want to add additional workers or get a bigger machine. I was wondering if monitoring the channel for things like average depth (or 99 percentile) would give me the information I needed.


I could of course "just skip the channel business, and use a java queue" is a fine proposal.  

But since the producers of this work are truly asynchronous (attached to the real world) I thought it best to keep the channel methodology.






--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/zD2jl-bIFXI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+unsubscribe@googlegroups.com.

Jarrod Swart

unread,
Jan 21, 2014, 12:51:51 PM1/21/14
to clo...@googlegroups.com
I have not used core.async much but I did use Go a fair bit and I concur with what Aaron said.

The missing piece for you Paul is that goblocks\channels are not just a way to do concurrent work, but also a control flow mechanism.

What Aaron is saying is that by using channels to control various aspects of your programs control flow, you should not need a "count" to do the same.

Much like writing Lisp and learning Clojure requires me to learn new idioms, so does the concept of channels.

I found this to be a great resource: http://swtch.com/~rsc/thread/  It is written by Russ Cox, a googler and contributor to Golang.  The scholarly papers and presentation slides mentioned at the bottom are also very good.

Hope I could help!


On Tuesday, January 21, 2014 11:56:56 AM UTC-5, Paul Viola wrote:
I think this is all well and good for a particular use of channel.

So perhaps I am misusing channels??  

To repeat: in one case I have workers pulling from a channel of real work. For various reasons this channel might get filled rather deeply. In this case I would want to add additional workers or get a bigger machine. I was wondering if monitoring the channel for things like average depth (or 99 percentile) would give me the information I needed.


I could of course "just skip the channel business, and use a java queue" is a fine proposal.  

But since the producers of this work are truly asynchronous (attached to the real world) I thought it best to keep the channel methodology.




On Tue, Jan 21, 2014 at 5:11 AM, Aaron France <aaron.l...@gmail.com> wrote:
On 21/01/14 14:09, Moritz Ulrich wrote:
On Tue, Jan 21, 2014 at 9:43 AM, Aaron France <aaron.l...@gmail.com> wrote:
Since channels yield nil when they are devoid of items, surely this is enough to know when the channel is empty?
That's not correct. Take-Operations block on empty channels. They
yield nil when they're closed. You could add a timeout to the take
operation to see if no item arrived in a specific time.

Much appreciated for the clarification. It's the same in Go.

I can imagine this pattern (take on a possibly closed channel being useful) being useful but I'm not convinced knowing the count of channel is a safe thing to know/care about.

My $0.02, perhaps Clojure does this differently.


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/zD2jl-bIFXI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Jan Herich

unread,
Jan 21, 2014, 1:39:09 PM1/21/14
to clo...@googlegroups.com
Hello Paul,

Why not just adjust the number of workers based on actual workload instead of monitoring the source channel for average depth ?

Have a look at this code from clojure-conj presentation about core.async, it's just a simple thread-pool service, where you can
specify source-channel, maximal number of threads (workers), function to be performed by each worker and timeout. Timeout 
means, for how long each thread will be waiting if there is no data on input before it will shut-down. 

The service starts with minimal amount of threads able to handle the workload and then progressively scales up to max specified
number of threads if there are no "free" threads able to take items from the source channel. 

Maybe i misunderstood your description of the problem, but for me, it seems like a perfect fit.

Jan

Dňa utorok, 21. januára 2014 17:56:56 UTC+1 Paul Viola napísal(-a):
I think this is all well and good for a particular use of channel.

So perhaps I am misusing channels??  

To repeat: in one case I have workers pulling from a channel of real work. For various reasons this channel might get filled rather deeply. In this case I would want to add additional workers or get a bigger machine. I was wondering if monitoring the channel for things like average depth (or 99 percentile) would give me the information I needed.


I could of course "just skip the channel business, and use a java queue" is a fine proposal.  

But since the producers of this work are truly asynchronous (attached to the real world) I thought it best to keep the channel methodology.




On Tue, Jan 21, 2014 at 5:11 AM, Aaron France <aaron.l...@gmail.com> wrote:
On 21/01/14 14:09, Moritz Ulrich wrote:
On Tue, Jan 21, 2014 at 9:43 AM, Aaron France <aaron.l...@gmail.com> wrote:
Since channels yield nil when they are devoid of items, surely this is enough to know when the channel is empty?
That's not correct. Take-Operations block on empty channels. They
yield nil when they're closed. You could add a timeout to the take
operation to see if no item arrived in a specific time.

Much appreciated for the clarification. It's the same in Go.

I can imagine this pattern (take on a possibly closed channel being useful) being useful but I'm not convinced knowing the count of channel is a safe thing to know/care about.

My $0.02, perhaps Clojure does this differently.


--
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to

For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
--- You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/zD2jl-bIFXI/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Bob Hutchison

unread,
Jan 21, 2014, 2:04:38 PM1/21/14
to clo...@googlegroups.com
On Jan 21, 2014, at 11:56 AM, Paul Viola <vi...@highspot.com> wrote:

I think this is all well and good for a particular use of channel.

So perhaps I am misusing channels??  

To repeat: in one case I have workers pulling from a channel of real work. For various reasons this channel might get filled rather deeply. In this case I would want to add additional workers or get a bigger machine. I was wondering if monitoring the channel for things like average depth (or 99 percentile) would give me the information I needed.



Often queue length isn’t a particularly good measure of what’s happening, nor a particularly good indicator of how to solve a problem, or even if there is a problem. You might consider measuring service time more directly by tracking how long requests take to be served. Then you might measure the utilisation of the workers, maybe by measuring how long they wait for a new job and how long it takes to complete a task. These measurements are pretty straightforward to implement in core.async (though it’d be more ‘efficient’ to build them right into channels). What to do in response to these numbers isn’t necessarily obvious (more workers? *fewer* workers? split the queue? faster CPU? More CPUs?)

Core.async introduces it’s own problem by banning unbound queues. The strategy of blocking clients as a consequence of maxing out a queue can be bad. So there’s a new problem that you have to deal with, making sure the channel size is big enough to handle any normal situation (which implies you to know what normal actually is, and sometimes you don’t know beforehand). To deal with this problem a channel count would be very helpful.


I could of course "just skip the channel business, and use a java queue" is a fine proposal.

Well…


For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages