(defn -main []
(let [mq-conn (connect-to-queue mq-config)
{event-stream :body} (http/get endpoint {:as :stream})]
(with-open [rdr (java.io/reader event-stream)]
(doseq [entity (line-seq rdr)]
(write mq-conn entity)))))
(defrecord EventStream [endpoint
http-config
stream]
component/Lifecycle
(start [this]
(let [response (http/get endpoint (merge {:as :stream} http-config))]
(assoc this :http-response response, :stream (:body response)))
(stop [this]
(.close stream)
(-> this (dissoc :http-response) (assoc :stream nil))))
(defrecord MessageQueue [config
connection]
component/Lifecycle
(start [this]
(assoc this :connection (connect-to-queue config)))
(stop [this]
(.close connection)
(assoc this :connection nil)))
(defrecord Shoveler [source sink
worker]
component/Lifecycle
(start [this]
;; To avoid blocking indefinitely, we put the processing in a future.
(assoc this :worker (future
(with-open [rdr (java.io/reader (:stream source)]
(doseq [entity (line-seq rdr)]
(write sink entity)))))
(stop [this]
(future-cancel worker)
(assoc this :worker nil)))
(defn -main []
(-> (component/system-map :config (read-config)
:events (map->EventStream {:endpoint endpoint})
:mq-client (map->MessageQueue {})
:shoveler (map->Shoveler {}))
(component/using {:events {:http-config :config}
:mq-client {:config :config}
:shoveler {:source :events
:sink :mq-client}})
component/start))
(defn -main []
(let [mq-conn (connect-to-queue mq-config)]
(while true ; ideally, the app is *always* ready to receive incoming events & put them into the queue
(try
(let [{event-stream :body} (loop [conn-timeout 1000]
(try
(http/get endpoint
{:as :stream
:conn-timeout conn-timeout
:socket-timeout 30000})
(catch java.net.SocketTimeoutException e
(if (> conn-timeout 32000) ; an upper limit. 32 seconds was arbitrarily chosen
(throw (SomeAppropriateException. "Service unavailable. Human attention needed." e))
(recur (* 2 conn-timeout)) ; I know that you can't actually `recur` inside of a `catch`, but I think it's the clearest way to present this pseudocode
))))]
(with-open [rdr (java.io/reader event-stream)]
(doseq [entity (line-seq rdr)]
(write mq-conn entity))))
(catch java.net.SocketTimeoutException e
(log/warn e "Didn't receive any data for thirty seconds. Reconnecting."))
(catch java.net.SocketException e
(log/warn e "Server closed the connection. Reconnecting."))
;; Any other exceptions *will* escape the retry loop
))))
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
...
--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/JCvKLIsfSgA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.
...
The HTTP connection may be closed at any time by the server; if that happens, the app should persistently attempt to reconnect using an exponential back-off pattern. In addition, if the app goes thirty seconds without receiving any data, it should close the connection and try to reconnect. It's not clear to me how to best express these "retry" requirements in the component lifecycle.
I'm writing an app that consumes events from a streaming HTTP connection and writes those events to a message queue
Only to answer the "retry on error" part of you question: You might like hara/event: