How to create a core.async channel that will drop messages if no consumer is currently reading from the channel?

272 views
Skip to first unread message

Yehonathan Sharvit

unread,
Dec 29, 2014, 6:38:03 AM12/29/14
to clo...@googlegroups.com
Hello,


I would like to create a channel that will drop messages if no consumer is currently reading from the channel.

Something like a channel with a dropping-buffer of size 0.
The problem is that the API doesn't allow to create a dropping-buffer of size 0.


With the following code snippet, I would like the reader to read nothing. 

(def c (create-unbuffered-chan)) 

(put! c :ok)

(go (println (<! c))); should not print anything



Your help is appreciated.
Thanks,
Yehonathan.

Laurent PETIT

unread,
Dec 29, 2014, 7:41:40 AM12/29/14
to clo...@googlegroups.com
Hello, 

I'm still a newbie with core.async myself, but I'll try to answer anyway, learning on the way ;-)

My idea is that you could leverage alts! which can return a default value immediately if none of the operations is "ready". I think "ready" means "would not wait/park" (there's either a value to be read in an input channel, or room in a channel buffer of an output channel).

So it's not really a channel that will have the feature, but rather a process. I guess it is easy to encapsulate the desired behaviour by hiding the process behind a function taking the desired output channel, and returning a channel exhibiting the encaspulated behavior:

(defn put-or-drop-channel [target-channel]
  (let [input-channel (chan)]
    (go-loop
      (when-let [v (<! input-channel)]
        (alt! [target-channel v] nil :default nil))))

?



--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Laurent Petit

Laurent PETIT

unread,
Jan 1, 2015, 12:19:32 AM1/1/15
to clo...@googlegroups.com
Hmmm, I'm really not convinced by the answer I gave you ...

I hope someone else with better xp than me with core.async (not hard !) will answer to you and correct me. 


--
Laurent Petit

Jozef Wagner

unread,
Jan 1, 2015, 4:31:09 AM1/1/15
to clo...@googlegroups.com
Implement a custom buffer type, or use mult. With mult, use tap/untap to control reading. Items sent to mult are dropped if there are no taps.

Jozef

--

Yehonathan Sharvit

unread,
Jan 7, 2015, 12:39:47 AM1/7/15
to clo...@googlegroups.com, clo...@googlegroups.com
What is the reason for the dropping buffer not supporting size 0?

Why the most trivial option of "no buffering" is not supported out of the box by core.async?



You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/6rYA9Ik7Aws/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Jozef Wagner

unread,
Jan 7, 2015, 3:22:32 AM1/7/15
to clo...@googlegroups.com
From my understanding of how core.async is implemented, if channel has a buffer, every message goes through it, regardless of whether there are takes waiting or not. This is due to transducers that are applied only on buffer items. None messages would thus pass through with a 0 size dropping buffer.

Jozef

Yehonathan Sharvit

unread,
Jan 7, 2015, 6:16:04 AM1/7/15
to clo...@googlegroups.com, clo...@googlegroups.com
Anyway, I was able to implement only one oof your suggested solutions. The tap/untsp one.

I was unable to implement a buffer of size 0.

I have tried, this:


(deftype NoBuffer [state]
  impl/UnblockingBuffer
  impl/Buffer
  (full? [this] false)
  (remove! [this])
  (add!* [this itm])
  clojure.lang.Counted
  (count [this] 0))


But it didn’t work at all!!

Please help.

Thanks.


Sent from Mailbox
Reply all
Reply to author
Forward
0 new messages