core.async go-loop questions

310 views
Skip to first unread message

Jonathon McKitrick

unread,
Dec 20, 2014, 10:17:01 PM12/20/14
to clo...@googlegroups.com
I'd like to implement a thread that will send an email, then send a response via websocket to the client when the send completes.

(defn my-wait-loop []
  (async/go-loop [status (async/<! @mailer-status)]
    (if status
      (do
        (println "Ready to send " status)
        (doseq [chan @connections]
          (println "Channel" chan)
          (send! chan (pr-str "Done" status)))
        (recur (async/<! @mailer-status)))
      (println "Go away"))))

(defn test-mailer []
  ;;(my-wait-loop)
  (reset! mailer-status
          (async/thread
            (try
              (mailer/send-speaker-confirmation-notification 1 "http://localhost")
              (catch Exception e
                (println (.getMessage e))))
            "Success")))

I would like to have the go-loop inside my-wait-loop run at all times, waiting for mailer-status to have a value.
But I believe that can never happen, since the go-loop is waiting on an empty channel, and the reset! with the mailer will replace the channel with a new one after the emails are sent.

Is there a batter way to do this, without needing to call my-wait-loop before the email thread is dispatched?

Erik Price

unread,
Dec 20, 2014, 10:52:00 PM12/20/14
to clo...@googlegroups.com
Part of what makes core.async great is that you don't have to use atoms to communicate between threads in every case. For your case, I don't see what using an atom gets you. It seems like you could replace the atom with a channel, and put values on the channel whenever mail is sent. Your code will be almost unchanged, except you'd replace your atom reset with a channel put.

e

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris Freeman

unread,
Dec 20, 2014, 10:56:26 PM12/20/14
to clo...@googlegroups.com
I'm a little uncertain exactly what your code is trying to do, but I believe you're trying to notify a bunch of connections after your speaker notification emails are sent.

In which case, I'd do something like this:

(defn send-notifications []
  (try
    (mailer/send-speaker-confirmation-notification 1 "http://localhost")
    true
    (catch Exception e
      (println (.getMessage e))
      false)))

(defn test-mailer []
  (let [done (async/thread-call send-notifications)]
    (when-let [status (async/<! done)]
      (doseq [chan @connections]
        (async/>! chan (pr-str "Done" status))))))

I've replaced the string "Success" with an explicit true and added an explicit false. I'd prefer if send-speaker-confirmation-notification returned a truthy value, but I don't know that it does.

In your original, the doseq call was done in a separate thread. If you'd still like that, wrap the when-let in a async/go call, as followings:

(defn test-mailer []
  (let [done (async/thread-call send-notifications)]
    (async/go 
      (when-let [status (async/<! done)]
        (doseq [chan @connections]
          (async/>! chan (pr-str "Done" status)))))))

But if you do, keep in mind that the main thread will end before the other two threads do, and that's probably unhealthy.

Chris

--

Udayakumar Rayala

unread,
Dec 20, 2014, 11:12:49 PM12/20/14
to clo...@googlegroups.com
You could probably move setting the atom and calling my-wait-loop inside the try.

(defn test-mailer []
  (async/thread
    (try
      (do
        (reset! mailer-status
                (mailer/send-speaker-confirmation-notification 1 "http://localhost"))
        (my-wait-loop))

      (catch Exception e
        (println (.getMessage e))))
    "Success"))

I also dont see the need for atoms. You could probably pass the channel as parameter to my-wait-loop. And also you dont need to put async/thread if the send-speaker-confirmation-notification doesnt wait until the message is put (which probably it shouldnt).

(defn my-wait-loop [mailer connections]
  (async/go-loop [status (async/<! mailer)]

    (if status
      (do
        (println "Ready to send " status)
        (doseq [chan connections]

          (println "Channel" chan)
          (send! chan (pr-str "Done" status)))
        (recur (async/<! mailer)))
      (println "Go away"))))

(defn test-mailer []
  (try
    (do
      (my-wait-loop (mailer/send-speaker-confirmation-notification 1 "http://localhost") @connections))

    (catch Exception e
      (println (.getMessage e))))
  "Success")

Thanks,
Uday.

Jonathon McKitrick

unread,
Dec 21, 2014, 8:17:21 AM12/21/14
to clo...@googlegroups.com
Well, my goal is to start a go-loop (if possible) at the root level of the code that simply parks and waits for a group of emails to be sent.  When that happens, it would wake up and broadcast the result of the send operation via web socket back to the browser.  I'd like to avoid starting that loop every time I send the emails, and I was under the impression that could be done with a go-loop, which would park until the channel had values to consume.  But you are saying that might be 'unhealthy'?


--
Jonathon McKitrick

You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/f9J_CBPoc5U/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

Chris Freeman

unread,
Dec 21, 2014, 9:42:39 AM12/21/14
to clo...@googlegroups.com

On Dec 21, 2014 7:17 AM, "Jonathon McKitrick" <jmcki...@gmail.com> wrote:

Well, my goal is to start a go-loop (if possible) at the root level of the code that simply parks and waits for a group of emails to be sent.  When that happens, it would wake up and broadcast the result of the send operation via web socket back to the browser.  I'd like to avoid starting that loop every time I send the emails, and I was under the impression that could be done with a go-loop, which would park until the channel had values to consume.  But you are saying that might be 'unhealthy'?


You want to avoid a situation where your main thread tries to exit before your worker threads are done. If that's not a possibility, then don't worry about it. (I hope I'm not confusing JVM behaviors with behaviors in other languages and runtimes.)

A go-loop is entirely appropriate for what you're trying to do. I think the issue you're running into is that thread creates a new channel for each individual call. Instead, consider creating a channel, and let your email sender loop, writing success to that channel after each . Then your go-loop just reads from the thread like you already have it doing.

A basic example might be:
 
(defn sender-receiver-example []
  (let [successes (async/chan)]

    ;; Thread to send emails
    (async/go
      (doseq [email-group source-of-emails]
(send-emails email-group)
(async/>! successes "Success")) ;; You could also send the email addresses instead of "Success"
      (async/close! xs-done)) ;; Without this, the consumer go-loop will never terminate.

    ;; Thread to notify browsers
    (loop [success (async/<! successes)]
      (when-not (nil? success)
(tell-your-browsers)
(recur (async/<! successes))))))


Chris
Reply all
Reply to author
Forward
0 new messages