core.async: peek the next value from a channel without consuming it

1,337 views
Skip to first unread message

Nahuel Greco

unread,
Sep 29, 2014, 10:12:33 AM9/29/14
to clo...@googlegroups.com
Currently if you block/park on a channel reading it by using <!!/<! or alts!/alts!! the value will be consumed when it appears, so there is no way to block/park waiting a new value without removing it from the channel. There is a non-consuming peek operation planned in the core.async roadmap or a design rationale exists for not including that operation?

Saludos,
Nahuel Greco.

Leon Grapenthin

unread,
Oct 5, 2014, 12:17:34 AM10/5/14
to clo...@googlegroups.com
Why would you want this? To leave the value inside the channel for other consumers?

In that case there would be no guarantee that the value returned by the peek operation is the next value in the channel, because it might have been consumed already.


Best regards, Leon

Nahuel Greco

unread,
Oct 5, 2014, 4:51:04 AM10/5/14
to clo...@googlegroups.com
I was thinking in a single-consumer scenario with a buffered chan, in which you want to check if you can consume the value before effectively consuming it. As you said, a peek operation has no sense if the channel has multiple consumers.

Saludos,
Nahuel Greco.

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Fluid Dynamics

unread,
Oct 5, 2014, 12:52:01 PM10/5/14
to clo...@googlegroups.com
On Sunday, October 5, 2014 12:51:04 AM UTC-4, Nahuel Greco wrote:
I was thinking in a single-consumer scenario with a buffered chan, in which you want to check if you can consume the value before effectively consuming it. As you said, a peek operation has no sense if the channel has multiple consumers.

And if you can't consume the value, then what? Nothing ever does, and that channel becomes useless?

Actually the only "peek" operation that to me makes much sense would be a (take-if pred chan) or something similar, which atomically tests the next value with pred and consumes it or not, so, it can't be consumed elsewhere between the pred test and optional consumption here. And if not consumed, two behaviors both occur to me as possible -- return nil or some other sentinel value for "do not want" or block until the unwanted object is consumed by someone else and then test the next item, etc.; at which point you've got four versions of take-if you'd want, the inside-go and outside-go versions cross product with the two when-not-wanted behaviors.

At that point, you'd probably be better off just writing a consumer that splits off the pred-matching items into one out channel and feeds everything else into a second channel, with your original consumer taking from the first of these and the others taking from the second. That gets you the block until version of the behavior. The other version can be had by making the pred-using consumer the sole consumer of the in channel, which takes a value, applies pred, and branches, on the "want" branch doing whatever and on the "do not want" branch putting the value onto an out channel that feeds the other consumers before taking its own "do not want" actions.

Nahuel Greco

unread,
Oct 5, 2014, 3:33:16 PM10/5/14
to clo...@googlegroups.com
Picture the following:

producer ---> go-loop ---> external service

1- The producer puts a value to a unbuffered (chan) by doing (>! c v)
2- The go-loop consumes the value with a take operation, **unblocking** the producer
3- The go-loop contacts the external-service but the external service answers it can't process the value yet
4- The go-loop waits some timeout to retry the request to the external service

After step 2 the producer continues to compute (suppose an expensive computing) a new value but the previous one wasn't effectively consumed by the external service. 
I don't want that, I want to enforce an end-to-end flow-control setup where the producer blocks on (>! c v) (the step 1) until the value is consumed by all parties, 

Sure, this flow control can be solved adding an ack channel and sending an ack from the go-loop to the producer when the external service effectively consumes the value, previously blocking the producer after step 1 waiting that ack.
But I think a peek operation in step 2 will be more elegant. Also, I was curious if the implementation of core.async channels limits in some way adding a peek operation.

A take-if with a pure predicate can't solve this, because you need to contact the external service to decide to consume the value or not. 


Saludos,
Nahuel Greco.

--

adrian...@mail.yu.edu

unread,
Oct 5, 2014, 3:59:47 PM10/5/14
to clo...@googlegroups.com
I think you can achieve an effect similar to what you want by using a pub with an appropriate topic function that classifies the input in some way, and then subscribing to the topic whose value you want to see. This also has the benefit of automatically 'mult'ing the channel input, so you can have multiple consumers looking for the same value. 

Nahuel Greco

unread,
Oct 5, 2014, 4:14:32 PM10/5/14
to clo...@googlegroups.com
Adrian: I don't see how a pub can help here, in the previous example to consume or not the value was decided not on some property intrinsic to the value (one you can create a topic from), but on the result of sending it to an external service.


Saludos,
Nahuel Greco.

adrian...@mail.yu.edu

unread,
Oct 5, 2014, 4:20:55 PM10/5/14
to clo...@googlegroups.com
Then how would peeking at the value help? 

Nahuel Greco

unread,
Oct 5, 2014, 4:40:25 PM10/5/14
to clo...@googlegroups.com
previous example with the peek operation:


1- The producer puts a value to a unbuffered (chan) by doing (>! c v)
2- The go-loop unparks from (peek<! c) without consuming the value, the producer keeps parked
3- The go-loop contacts the external-service
4-A If the external-service answer is ok, the go-loop consume (and discard) the value by doing a normal (<! c), and the producer unparks
4-B If the external-service answers it cannot process the value, the go-loop waits until a timeout to retry step 3

The producer only unparks when the value is effectively consumed by the external service. That's my objective. 

I think your pub proposal replaces the take-if proposal given before, but I think take-if (and pub) doesn't work for this scenario.


Saludos,
Nahuel Greco.

adrian...@mail.yu.edu

unread,
Oct 5, 2014, 4:57:59 PM10/5/14
to clo...@googlegroups.com
Ah, I think we're on the same page now. I've come across the need for this recently in some code for a UDP based protocol between a multiplayer game client and server. 

I still think a pub fits in here nicely. You can consume the value from the channel in question and park until you get an acknowledgment from the external service (or timeout). The producer would subscribe to another topic on your pub that will get a value put onto it when acknowledgment or time out occurs in the consumer. Be sure to close the subs after you've gotten the ack or timed out. 

Nahuel Greco

unread,
Oct 5, 2014, 5:04:58 PM10/5/14
to clo...@googlegroups.com
You can do that without a pub, the producer can send a new (chan) inside the request to the go-loop and the go-loop will ack on that chan when getting a good response from the external service.
That schema solves this scenario, I mentioned it in the previous mail, but I think a peek operation maybe could be better and can simplify the producer code.

Saludos,
Nahuel Greco.

adrian...@mail.yu.edu

unread,
Oct 5, 2014, 5:09:19 PM10/5/14
to clo...@googlegroups.com
Yes, but the advantage of using a pub is that it's simpler to have one input channel than to continually spawning new ones. But that's just my opinion. Anyway, sorry I couldn't be more help. 

Leon Grapenthin

unread,
Oct 5, 2014, 7:55:13 PM10/5/14
to clo...@googlegroups.com


On Sunday, October 5, 2014 5:33:16 PM UTC+2, Nahuel Greco wrote:
Picture the following:

producer ---> go-loop ---> external service

1- The producer puts a value to a unbuffered (chan) by doing (>! c v)
2- The go-loop consumes the value with a take operation, **unblocking** the producer
3- The go-loop contacts the external-service but the external service answers it can't process the value yet
4- The go-loop waits some timeout to retry the request to the external service

After step 2 the producer continues to compute (suppose an expensive computing) a new value but the previous one wasn't effectively consumed by the external service. 
I don't want that, I want to enforce an end-to-end flow-control setup where the producer blocks on (>! c v) (the step 1) until the value is consumed by all parties, 
 
If producing the next value is expensive, why would you want to delay it from point 2 to after point 4? Once the external service has processed the current value, you want the next value available as soon as possible, wouldn't you?

Gary Verhaegen

unread,
Oct 5, 2014, 8:03:06 PM10/5/14
to clo...@googlegroups.com
I think you should go for the ack solution. What is your reservation about it?

Nahuel Greco

unread,
Oct 5, 2014, 8:14:19 PM10/5/14
to clo...@googlegroups.com

Maybe not, maybe you want to reserve the cpu cycles for other tasks, or the producer needs a confirmation for other purposes before computing or requesting the value from another party. This is a simplified and distilled scenario.

Nahuel Greco

unread,
Oct 5, 2014, 8:23:39 PM10/5/14
to clo...@googlegroups.com

No reservation, the ack solution works, but 1- I think the producer code can be simplified with the peek operation and 2- I want to know is there is a fudamental limitation in chans design/implementation prohibiting adding a peek operation.

Leon Grapenthin

unread,
Oct 5, 2014, 8:54:00 PM10/5/14
to clo...@googlegroups.com


On Sunday, October 5, 2014 10:14:19 PM UTC+2, Nahuel Greco wrote:

Maybe not, maybe you want to reserve the cpu cycles for other tasks, or the producer needs a confirmation for other purposes before computing or requesting the value from another party. This is a simplified and distilled scenario.

If you only want to begin to produce the next value after the current has been consumed by the external service I doubt that core.async has any use: A perfectly fitting solution can be described as a completely synchronous procedure where none of the steps are supposed to overlap in time:

1. Request value from other party, wait for it or compute it
2. Wait for external service to be ready
3. Send value to external service, discard value
4. Go to 1

edbond

unread,
Oct 6, 2014, 1:36:59 PM10/6/14
to clo...@googlegroups.com
Add one more chan, "external ready".
Put :ok there to let producer generate new value.

producer:
- read from "external ready"
- generate value
- put into "outgoing" chan

client:
- contact external server, put in "external ready" if ok
- read from "outgoing" chan
- send to external

Handle exceptions and loop where you need.

Fluid Dynamics

unread,
Oct 9, 2014, 3:39:43 AM10/9/14
to clo...@googlegroups.com
On Monday, October 6, 2014 9:36:59 AM UTC-4, edbond wrote:
Add one more chan, "external ready".
Put :ok there to let producer generate new value.

producer:
- read from "external ready"
- generate value
- put into "outgoing" chan

client:
- contact external server, put in "external ready" if ok
- read from "outgoing" chan
- send to external

Handle exceptions and loop where you need.

If you're not reading from "outgoing" until the external server is known to be ready, you don't need the "external ready" channel. Backpressure on the "outgoing" channel will suffice to keep the producer from overrunning the consumer in this case.

Eduard Bondarenko

unread,
Oct 9, 2014, 10:17:27 AM10/9/14
to clojure
I think the point is to not generate values by producer if external
service is not available
"The producer only unparks when the value is effectively consumed by
the external service. That's my objective."

"external ready" channel here serves as a latch that stops producer
generating value.
> --
> You received this message because you are subscribed to the Google
> Groups "Clojure" group.
> To post to this group, send email to clo...@googlegroups.com
> Note that posts from new members are moderated - please be patient with your
> first post.
> To unsubscribe from this group, send email to
> clojure+u...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/clojure?hl=en
> ---
> You received this message because you are subscribed to a topic in the
> Google Groups "Clojure" group.
> To unsubscribe from this topic, visit
> https://groups.google.com/d/topic/clojure/QbiwXYDw6oA/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to

Fergal Byrne

unread,
Oct 9, 2014, 12:07:59 PM10/9/14
to clo...@googlegroups.com
Hi Nahuel,

I think it's worth stepping back from discussing solutions and have a look at the problem. If I'm reading things right you need the following:

1. Producer produces values one at a time - should not step until last value has been handled correctly elsewhere.
2. Middleman needs to consume one value and (potentially) retry an external service until it can verify correct handling.
3. Producer should not interpret the consumption of a value as a signal to proceed.

You also seem to want the following:

a. Use core.async
b. Make the solution "elegant"

That's all fine, but it seems like a bit of a golden hammer. Your problem is precisely not the kind of problem core.async channels are designed to solve. Channels are there to provide decoupling of producer and consumer, whereas you, in fact, need the kind of coupling provide by function calls.

Channels are for when you don't want to know how or when your product is handled. You're finished, you throw the product on a channel, and you move on to making your next product. All you ever find out is if/that your product has been picked off the channel.

Function calls are for when you do need to know what happened to your product, and when it has all happened, because you can't move on until the last thing has been done.

Just call the middleman in synchronous code in your producer, with one value, and look at its return value to tell you what to do next. You get your blocking semantics (1-3) and you also get information about success or failure. 

Using channels, consumption is the only information the producer hears back. To tell the producer you've completed your mission as middleman, you'll need another channel. If you really want to use core.async:

(>! c v) ; blocks until consumed
(respond-to (<! ackchan)) ; blocks until handled
...do expensive stuff

but this is just an expensive function call.

Regards,

Fergal Byrne


You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--

Fergal Byrne, Brenter IT

http://inbits.com - Better Living through Thoughtful Technology

Founder of Clortex: HTM in Clojure - https://github.com/nupic-community/clortex

Author, Real Machine Intelligence with Clortex and NuPIC 
Read for free or buy the book at https://leanpub.com/realsmartmachines

Speaking on Clortex and HTM/CLA at euroClojure Krakow, June 2014: http://euroclojure.com/2014/
and at LambdaJam Chicago, July 2014: http://www.lambdajam.com

Join the quest for Machine Intelligence at http://numenta.org

Nahuel Greco

unread,
Oct 9, 2014, 12:22:50 PM10/9/14
to clo...@googlegroups.com
Fluid: as you said, backpressure on the outgoing channel will be sufficient to avoid producer to overrun consumer (without using any extra ack-channel), but the producer will compute a new not-yet-used value before blocking on sending it to consumer. That's what I want to avoid.

I can also use alts! on the producer to check if he can write to the channel before computing the value, but I wanted to keep the code in the producer in the simplest form possible (a simple (>! c v) parking until all parties consume the value, avoiding an ack-chan or an alts!) . 

Eduard: Exactly.

Fregal: If middleman is managing all the communication with the external service for *multiple* producers, then it can't be reduced to a simple synchronous function call from a producer, hence the necessity of core.async (the alternative is to descend to locks world). 



Saludos,
Nahuel Greco.

Fergal Byrne

unread,
Oct 9, 2014, 12:40:04 PM10/9/14
to clo...@googlegroups.com
Hi Nahuel,

Thanks for the clarification. Multiple producers, single middleman is a different problem from the one we (or at least I) thought we were dealing with. In that case, the put your ack channel in your product, block on the ack channel method is the right one.

(loop [...]
..something expensive...
  (let [ack (chan)]
    (>! c [v ack])
    (respond-to (<! ack))
    (recur ...))

Regards,

Fergal

Nahuel Greco

unread,
Oct 9, 2014, 1:02:35 PM10/9/14
to clo...@googlegroups.com
Fergal: I knew about the ack channel solution (note I mentioned it a while couple of mails ago), I was trying to avoid it with the objective of making the producer code extra simple. 

Also note, in core.async the producers aren't totally decoupled from consumers, there is an implicit consumer->producer signalling in the form of channel backpressure. The only way to decouple them totally is to use "infinitely" sized or dropping queues (something like Erlang tries to do given enough memory, where the only form of backpressure are explicit acks). The peek operation I proposed is a way to fine control that backpressure signalling.

In POSIX the select() operation doesn't peeks the value but can be used to know if there is a value present without consuming it. In contrast, I think alts! complects three operations: 1- blocking until new value / can write one 2- reading/putting it 3- when reading, consuming it. 

Besides the alternative solutions for the producer code (waiting on ack channel, alts! before computing the value) my original question remains unanswered: There is something in core.async implementation/design/roadmap preventing adding a peek operation? I was hoping to avoid a deep dive on core.async code :)



Saludos,
Nahuel Greco.

Fergal Byrne

unread,
Oct 9, 2014, 2:04:47 PM10/9/14
to clo...@googlegroups.com
Hi Nahuel,

If you look at the definition of "simple" it means "not compound". In your case, you are trying to have a single thing (the channel) do two jobs: convey the value, and control the producer. This is not simple, and makes the producer trust that the middleman will always respect this hidden dependence. It also hides your intent elsewhere (anyone reading your code would assume that the producer does not have to wait).

In this case (the ack channel), you are adding a couple of lines of code (to make the ack channel and to block until it comes back), but you are simplifying the producer when you do this, not complicating it. The (!< c [v ack]) does one thing - I'm finished my expensive op, please deal with it. The (respond-to (<! ack)) does one thing - wait until the value has been processed, now I can move on.

The code is self-documenting. The producer finishes its work, puts the stuff on a channel with a self-addressed envelope, and then immediately blocks for the ack. It couldn't possibly be simpler than this.

The decoupling channels provide is exactly that: everything except backpressure is decoupled. You're trying to recouple the producer and middleman, by trying to make a non-atomic thing atomic. 

You're also designing the middleman based on the needs of the producer. With your design, the middleman can only handle one product at a time. With the put-ack design, the middleman could have a longer queue, construct batches of similar values, and more efficiently use the external service. You're also creating a bottleneck, because other producers are all blocked until your middleman has got the external service to work with your value. 

Either the external service's ability to complete is based on something in your value or not. If not, then have the middleman wait until the service is ready (as someone suggested) before taking from the channel - this achieves your goals with backpressure. If you do need to know something in the value, then take the value and spin up a go block to retry the service (passing it the ack channel, so it can unblock the producer), and move on.

Adding peek to channels couldn't be hard (it's open source after all!), so you could do it for yourself if you think it would give you any value. But I can give you a fairly precise estimate on how long a PR for this would sit in Rich's in-tray..

Cheers,

Fergal

Nahuel Greco

unread,
Oct 9, 2014, 3:44:11 PM10/9/14
to clo...@googlegroups.com
Fergal: readability when using an ack channel is improved, you are totally right on this point and is probably the most important argument for using acks, besides there is no peek currently on core.async (note if you have a long producer-consumer1-consumerN chain everyone must implement the ack procedure). But I think you are wrong when you say a middleman using peek can only support a single product at time and no multiple producers batching. Without thinking about producer code form, peeking + consuming (implicit "ack") is the same thing as doing consuming + explicit ack. Sure, you need a multiple channels alts!-like peek operation. What can change the equation is to use a single ack per multiple values, I think peek can't support something like that, but that also changes the design of the producer code. 

Note "waiting until the service is ready before taking from the channel" without using acks or peek+consume can introduce race conditions where the service goes down just after sensing it ready and consuming the producer value. 

Just to be clear, I know acks solves this (changing producer code), and I currently use an acks schema. I'm only curious about why a peek operation wasn't included in core.async and the feasibility of adding it. 

Thanks for your replies.

Saludos,
Nahuel Greco.

Leon Grapenthin

unread,
Oct 9, 2014, 7:16:39 PM10/9/14
to clo...@googlegroups.com
On Thursday, October 9, 2014 2:22:50 PM UTC+2, Nahuel Greco wrote:
Fluid: as you said, backpressure on the outgoing channel will be sufficient to avoid producer to overrun consumer (without using any extra ack-channel), but the producer will compute a new not-yet-used value before blocking on sending it to consumer. That's what I want to avoid.
 
You can avoid this very easily. Everytime before the producer expensively calculates its next value to put it on the channel, make it put a cheaply calculated value, e. g. the keyword :useless, on the channel instead. 

Now on the consumer side you can do this:

0. Blocking (immediate take): Consume the keyword :useless = The producer will begin production
1. This is what you want from peek! Blocking take = Waiting for the producer to have produced the expensive value, consume it but the producer won't calculate the next value (because he puts the :useless keyword which won't be consumed until later)
2. Wait for external service to be ready, put value there
3. Go to step 1 trigger production of the next value

Assuming the producer and the consumer run on different threads, what have you won by introducing step 0? 

1. The producer thread can't begin to work until the consumer thread did his work
2. The consumer thread can't begin to work until the producer thread did his work

= At every point in time only one thread performs. Entirely :useless multi-threading.

I can also use alts! on the producer to check if he can write to the channel before computing the value,  [...]
You can't 

 
[...]
 
Fregal: If middleman is managing all the communication with the external service for *multiple* producers, then it can't be reduced to a simple synchronous function call from a producer, hence the necessity of core.async (the alternative is to descend to locks world). 
 
If I understand you right, you would want to start production of multiple values in parallel, wait for the first to be ready, then put that to the external service and loop. Here core.async can be of help. E. g. 
(defn exp-calcs
  []
  #{(a/thread calc-exp-val-1)
    (a/thread calc-exp-val-2)
    (a/thread calc-exp-val-3)
    (a/thread calc-exp-val-4)
    (a/thread calc-exp-val-5)
    ; ...
    })

(loop [chans (exp-calcs)]
  (let [[val ch] (a/alts!! (seq exp-calcs))]
    ;; wait for external service, put val there
    
    ;; also have the cancellation ch in chans and use it to guard the
    ;; following form
    (let [chans (disj chans ch)]
      (recur (if (seq chans)
               chans
               (exp-calcs))))))



Again, no peek required.

Nahuel Greco

unread,
Oct 9, 2014, 8:25:38 PM10/9/14
to clo...@googlegroups.com
Leon: your :useless example is equivalent to one using the hypothetical peek operation and to the ack-channel solution, but what you define as ":useless multithreading" because at one time only one thread is working, is a valid synchronization scenario and the one I was searching for. The objective here was not to max CPU usage with these two threads / go-blocks. As I said in a previous mail "maybe you want to reserve the cpu cycles for other tasks, or the producer needs a confirmation for other purposes before computing or requesting the value from another party, this is a distilled example". Also think the producer after sending the computed value and before sending the :useless signal can do other things, maybe contacting other services to pre-fetch data used in the next value. 

Also the :useless example needs more code on the producer side than a simple (>! c v). Given there is no peek operation and the producer code must participate explicitly in the synchronization, I prefer the ack solution over the :useless one.

You are right about impossibility of using alts! in the producer code to check if the chan can be written to before computing the value. I forgot alts! is a function, not a macro, and you can't pass expressions to it to be evaluated when the chan becomes available to write.

Your last sample seems an useful way to manage simple stateless single-value producers from the consumer, but producers can be long-running go-blocks with their own state and with channels to other services, so I think is a little off base the original question.


Saludos,
Nahuel Greco.

Leon Grapenthin

unread,
Oct 9, 2014, 11:05:38 PM10/9/14
to clo...@googlegroups.com


On Thursday, October 9, 2014 10:25:38 PM UTC+2, Nahuel Greco wrote:
Leon: your :useless example is equivalent to one using the hypothetical peek operation and to the ack-channel solution, but what you define as ":useless multithreading" because at one time only one thread is working, is a valid synchronization scenario and the one I was searching for. The objective here was not to max CPU usage with these two threads / go-blocks.

What I wanted to illustrate with useless multithreading was my earlier point that for your requirements I can't find a reason for multithreading and hence no reason for synchronization.
 
As I said in a previous mail "maybe you want to reserve the cpu cycles for other tasks, or the producer needs a confirmation for other purposes before computing or requesting the value from another party, this is a distilled example".
What difference should this make? I have already integrated these requirements in my mail from Oct. 5. You can put the confirmation part between steps 3 and 4.
 
Also think the producer after sending the computed value and before sending the :useless signal can do other things, maybe contacting other services to pre-fetch data used in the next value. 
 
 It could, because it is unblocked.

But i f you had used peek!, it couldn't. It would just have to pre-fetch its data and peek! would be blocked until its done and has produced a value. (I hope the discussion is still about finding a valid reason why peek! would be a useful operation in core.async)
 

Also the :useless example needs more code on the producer side than a simple (>! c v). Given there is no peek operation and the producer code must participate explicitly in the synchronization, I prefer the ack solution over the :useless one.
The producer must participate in the synchronization in the ack solution as well. There is no peek operation in the ack solution as well. 

You could as well let the producer put the value twice and every first take would be your peak. 

I don't recommend :useless or an ack channel as a "solution" over the other. My point is that if this problem was real, using multiple threads as a solution and the resulting effort to coordinate them so that only one of them would run at a time doesn't seem to provide benefits in terms of performance, abstraction or readability.


Kind regards,
 Leon.
Reply all
Reply to author
Forward
0 new messages