Core async / how to work around "No more than 1024 pending puts are allowed on a single channel"

2,114 views
Skip to first unread message

Hanh Huynh Huu

unread,
Mar 26, 2014, 10:00:43 PM3/26/14
to clo...@googlegroups.com
Hi guys, 

I have an app that sometimes needs to put a lot of messages on a channel. The rate is not directly under my control - the messages come from the outside.

As a quick test, I wrote this snippet

(def c (chan (async/buffer 10240)))

(go
  (doseq [i (range 1024000)]
    (>!! c i)))

and i hit this exception:
Exception in thread "async-dispatch-14" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)

If I replace the buffer with a dropping buffer or a sliding buffer, it works but I can't afford to drop messages. However, I'd be fine with a larger buffer than 1024.
I also replaced the >!! with >! and async/put!, but the result is the same.

Is there a way to achieve my goal?

Thanks,
--h

Tassilo Horn

unread,
Mar 27, 2014, 5:04:41 AM3/27/14
to Hanh Huynh Huu, clo...@googlegroups.com
Hanh Huynh Huu <han...@gmail.com> writes:

> If I replace the buffer with a dropping buffer or a sliding buffer, it
> works but I can't afford to drop messages. However, I'd be fine with a
> larger buffer than 1024. I also replaced the >!! with >! and
> async/put!, but the result is the same.
>
> Is there a way to achieve my goal?

The 1024 is defined as

(def ^:const ^int MAX-QUEUE-SIZE 1024)

in namespace clojure.core.async.impl.protocols. So you could do

(in-ns 'clojure.core.async.impl.protocols)
(def ^:const ^int MAX-QUEUE-SIZE 2048)
(in-ns 'your.own.ns)

to override it. However, I don't know if that's a good idea...

Bye,
Tassilo

Jozef Wagner

unread,
Mar 27, 2014, 5:05:26 AM3/27/14
to clo...@googlegroups.com
If you cannot drop messages, just increase the size of a channel buffer (the argument to the chan function). If the channel buffer is full, puts are placed in a channels 'put buffer', which is limited to 1024 puts. 

Jozef


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages