core.async and the mapcat transducer

265 views
Skip to first unread message

Neil Menne

unread,
Apr 2, 2016, 12:09:22 AM4/2/16
to Clojure Dev
I have put together a kind of minimal case demonstrating the problem I'm seeing:
https://gist.github.com/NeilMenne/05cd24dfb703c63c068e6e8b1d14fe36

Using the current version of core.async, I am developing an application that ports data from one format to another. As part of the transformation, a single input record becomes some larger number of output records. What I noticed was that the backpressure mechanisms didn't seem to happen.

The observed behavior is that the producer of records far outruns the consumer (with no exceptions for anything). The java heap was entirely consumed and progress was reduced to a crawl.

Am I doing something wrong here?

Thanks for your time!

-Neil

Alex Miller

unread,
Apr 2, 2016, 12:59:54 AM4/2/16
to Clojure Dev
A chan transducer will proceed in eagerly executed steps. An expanding transducer like mapcat will thus produce all the expanded values for a single input at once. These items may actually exceed the specified buffer capacity. Doing otherwise is largely impractical while retaining other channel semantics - you're seeing the consequences of this behavior.

An alternative would be to split the channel into two (non-transducer) channels and introduce a new intervening process that applies the mapcat and pushes the outputs onto the output channel. 

Neil Menne

unread,
Apr 2, 2016, 10:29:00 AM4/2/16
to Clojure Dev
That was the solution I went with. I just wanted a sanity check.

Thanks, Alex!

Reply all
Reply to author
Forward
0 new messages