Uncaught Error: Assert failed: No more than 1024 pending puts are allowed on a single channel.

199 views
Skip to first unread message

Andrew Stoeckley

unread,
Jul 17, 2014, 10:19:31 AM7/17/14
to clojur...@googlegroups.com
I am using core.async with Om and a websocket. I use channels to communicate changes to a parent go block in IWillMount for om/update! and om/transact! and I also use a channel to handle incoming responses from a websocket.

I get quite a bit of data pushed onto these channels, sometimes a thousand puts per seconds on each. I can run my app fine for many minutes with considerable updates and there is no problem: the channels and Om respond as advertised.

Eventually, however, everything stops with the console message:

Uncaught Error: Assert failed: No more than 1024 pending puts are allowed on a single channel.

(I also found a similar unanswered SO question here: http://stackoverflow.com/questions/24753884/no-more-than-1024-pending-put-on-a-single-channel)

I am aware of what this message means but not sure what would cause it unless the infinite go loop in IWillMount stops doing its thing. Now, I came across this:

https://twitter.com/swannodette/status/456131778781396992

Noteable info from that exchange: "run into issues if there are pending puts when initiate the go block...[no] problem if start with channel reads [first]."

I can expect I might have puts onto a channel before Om first starts reading them, so should I consider doing something more than a mere go loop in IWillMount? I have something like this now:

(defn watch-root
"The invisible parent root that monitors all state changes from core.async channels and then builds the actual interface in main-root."
[app owner]
(reify
om/IWillMount
(will-mount [this]
(go (while true
(let [t (<! state/chan-om-transact)]
#_(println t)
(apply om/transact! app t))))
(go (while true
(let [u (<! state/chan-om-update)]
#_(println u)
(apply om/update! app u)))))
om/IRender
(render [this]
(om/build main-root app))))

Thomas Heller

unread,
Jul 17, 2014, 12:58:25 PM7/17/14
to clojur...@googlegroups.com
Not an Om user here but sounds like you are receiving messages faster than you can handle them.

A "slow consumer" is a common problem in a messaging situation and core.async has 2 built-in ways to deal with that. You can either use (async/chan (async/slliding-buffer n)) or (async/dropping-buffer n), both will start dropping messages either at the head or tail of the buffer. Contrary to popular believe React is actually quite slow when doing alot of state updates, it just doesn't appear that way since all updates are batched together in a "frame".

If you can't afford to drop messages you could try using a WebWorker to do processing in another thread and push state updates seperately.

But, as I said, not an Om user, just an educated guess.

Cheers,
/thomas

William Sommers

unread,
Jul 17, 2014, 4:29:07 PM7/17/14
to clojur...@googlegroups.com
Could you post the repo or the code where the channels are being used?

It looks like you might be putting things on a channel within the IRender lifecycle hook which will happen once each time there is a state change.

Will

Andrew Stoeckley

unread,
Jul 17, 2014, 8:45:58 PM7/17/14
to clojurescript
Hi William, the takes off the channel are happening in IWillMount
(posted in my original message), however, the puts onto the channel
are not happening in the React lifecycle. They are happening
separately as a result of websocket messages incoming. My puts look
like this:

(defn websocket-handler-example [id]
(put! state/chan-om-transact [[:cursor :nested-cursor] #(conj % id)]))

And then they are read from in the "watch-root" function I posted earlier.

Perhaps the problem is that my channels are not both written to and
read from within the React lifecycle?

Everything seems to go fine -- for awhile. Within a few minutes of
this fast activity, eventually I get the error about pending puts on
the channel.
> --
> Note that posts from new members are moderated - please be patient with your first post.
> ---
> You received this message because you are subscribed to a topic in the Google Groups "ClojureScript" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojurescript/iwTKqy_6YJo/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to clojurescrip...@googlegroups.com.
> To post to this group, send email to clojur...@googlegroups.com.
> Visit this group at http://groups.google.com/group/clojurescript.

Faris Nasution

unread,
Jul 17, 2014, 11:32:59 PM7/17/14
to clojur...@googlegroups.com
Not om related, but I did simple experiment:

(def counter (atom 0))

(def my-chan (chan))

(.setInterval js/window
                  (fn []
                    (put! my-chan @counter)
                    (swap! counter inc)) 1000)

(go (while true
        (let [value (<! my-chan)]
          (.log js/console value))))

While I guess it would print 0 1 2 3 4 and so on, it prints 0 3  7 10. So I guess there are pending/untaken value, that makes the channel complain.


You received this message because you are subscribed to the Google Groups "ClojureScript" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojurescrip...@googlegroups.com.

Andrew Stoeckley

unread,
Jul 17, 2014, 11:37:27 PM7/17/14
to clojurescript
Faris -- I just tried the code you mentioned and it prints 0 1 2 3 4
... as anyone would expect. I can't imagine why you are seeing 0 3 7
10 as there is surely more going on there on your side, because that
is not what should be happening.

Faris Nasution

unread,
Jul 18, 2014, 12:18:22 AM7/18/14
to clojur...@googlegroups.com
Yep that was a mistake on my part, actually the code isn't exactly like that. I wrap the (chan) inside a function and then in go-loop I invoke that function.
Reply all
Reply to author
Forward
0 new messages