Stuart Sierra's Component: retries & restarts in production

928 views
Skip to first unread message

jo...@signafire.com

unread,
Sep 2, 2015, 8:44:07 PM9/2/15
to Clojure
TLDR: how do you use Component when the application logic involves retrying failed components?

Background:
I'm writing an app that consumes events from a streaming HTTP connection and writes those events to a message queue (see Code Illustration #1). It seems like that could be captured easily with three components —an HTTP stream, a message queue connection, and a "shoveler" that depends on the other two (see Code Illustration #2)— but the reconnection requirements complicate things…
The HTTP connection may be closed at any time by the server; if that happens, the app should persistently attempt to reconnect using an exponential back-off pattern. In addition, if the app goes thirty seconds without receiving any data, it should close the connection and try to reconnect. (see Code Illustration #3) It's not clear to me how to best express these "retry" requirements in the component lifecycle. Like, is it blasphemous for a component to be calling stop and start on its injected dependencies?

Some possible approaches:
  • Throw different kinds of exceptions to indicate what should happen (using namespaced keywords, perhaps?), handled by whoever calls component/start on the system-map.
    • The exception provides the component and system at the time of the exception, enabling a sort of "resume" capability.
    • I'm under the impression that relying on exceptions for control flow is an anti-pattern.
  • Create a sort of custom system implementation, one that goes beyond calling start on its components in dependency order to monitor failures and direct retries "appropriately".
    • "A system is a component which knows how to start and stop other components." (from the README)
      So the fact that we want the shoveler component to be capable of restarting the HTTP component indicates that the shoveler should actually be considered a system. (right?)
      • If the HTTP stream is injected into the shoveler as a dependency, how is it possible for the shoveler to stop the HTTP stream and then start it again with any dependencies the stream may have?
  • Ensure that every component/Lifecycle method implementation is idempotent, so that I can get good-enough "restart" semantics by just calling start-system again.
    • I know that idempotence is generally a Good Thing anyway, but using start-system as a panacea strikes me as crude.

Code Illustrations:

1. Rough sketch of app without timeout/retry logic or component:
(defn -main []
 
(let [mq-conn (connect-to-queue mq-config)
       
{event-stream :body} (http/get endpoint {:as :stream})]
   
(with-open [rdr (java.io/reader event-stream)]
     
(doseq [entity (line-seq rdr)]
       
(write mq-conn entity)))))

2. Rough sketch of app with component but still without timeout/retry logic:
(defrecord EventStream [endpoint
                        http
-config
                        stream
]
  component
/Lifecycle
 
(start [this]
   
(let [response (http/get endpoint (merge {:as :stream} http-config))]
     
(assoc this :http-response response, :stream (:body response)))
 
(stop [this]
   
(.close stream)
   
(-> this (dissoc :http-response) (assoc :stream nil))))

(defrecord MessageQueue [config
                         connection
]
  component
/Lifecycle
 
(start [this]
   
(assoc this :connection (connect-to-queue config)))
 
(stop [this]
   
(.close connection)
   
(assoc this :connection nil)))

(defrecord Shoveler [source sink
                     worker
]
  component
/Lifecycle
 
(start [this]
   
;; To avoid blocking indefinitely, we put the processing in a future.
   
(assoc this :worker (future
                         
(with-open [rdr (java.io/reader (:stream source)]
                           
(doseq [entity (line-seq rdr)]
                             
(write sink entity)))))
 
(stop [this]
   
(future-cancel worker)
   
(assoc this :worker nil)))

(defn -main []
 
(-> (component/system-map :config (read-config)
                           
:events (map->EventStream {:endpoint endpoint})
                           
:mq-client (map->MessageQueue {})
                           
:shoveler (map->Shoveler {}))
     
(component/using {:events {:http-config :config}
                       
:mq-client {:config :config}
                       
:shoveler {:source :events
                                   
:sink :mq-client}})
      component
/start))

3. Rough sketch of desired production behavior, not using Component:
(defn -main []
  (let [mq-conn (connect-to-queue mq-config)]
   
(while true  ; ideally, the app is *always* ready to receive incoming events & put them into the queue
     
(try
       
(let [{event-stream :body} (loop [conn-timeout 1000]
                                     
(try
                                       
(http/get endpoint
                                                 
{:as :stream
                                                 
:conn-timeout conn-timeout
                                                 
:socket-timeout 30000})
                                       
(catch java.net.SocketTimeoutException e
                                         
(if (> conn-timeout 32000)  ; an upper limit. 32 seconds was arbitrarily chosen
                                           (throw (SomeAppropriateException. "Service unavailable. Human attention needed." e))
                                           
(recur (* 2 conn-timeout))  ; I know that you can't actually `recur` inside of a `catch`, but I think it's the clearest way to present this pseudocode
                                           
))))]
         
(with-open [rdr (java.io/reader event-stream)]
           
(doseq [entity (line-seq rdr)]
             
(write mq-conn entity))))
       
(catch java.net.SocketTimeoutException e
         
(log/warn e "Didn't receive any data for thirty seconds. Reconnecting."))
       
(catch java.net.SocketException e
         
(log/warn e "Server closed the connection. Reconnecting."))
        ;; Any other exceptions *will* escape the retry loop
       
))))

Raymond Huang

unread,
Sep 3, 2015, 4:03:53 AM9/3/15
to clo...@googlegroups.com
Another way I can think of decomposing this is to buffer/queue communication between your two components, i.e a core.async channel. This will decouple the two components allowing your MessageQueue to manage it's own reconnection.

Interesting question about whether `start` calling `stop` is blasphemy or not. I hope someone else can provide some insight on that.


--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to the Google Groups "Clojure" group.
To unsubscribe from this group and stop receiving emails from it, send an email to clojure+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Andy-

unread,
Sep 3, 2015, 9:15:54 AM9/3/15
to Clojure
Only to answer the "retry on error" part of you question: You might like hara/event:

HTH
...

Dave Tenny

unread,
Sep 4, 2015, 7:57:21 AM9/4/15
to Clojure
I'm using components to encapsulate Langohr/RabbitMQ interactions in cases where Langohr auto-recovery wasn't happy with what was going on.

I make sure the Lifecycle methods are idempotent, built the component stack in the correct order.  

To make sure that I can deal with the exceptions that require restarting the component stack, I have some macros that wrap up exception handling, retries, and component stack restarts.

So

(with-rmq-publisher! [channel component-system]
  ... do my stuff ...)

The body of the macro ("... do my stuff ...") is turned into a function and and will be re-executed if the system is restarted, so you have to beware
of side effects or other things you may not want executed multiple times.  Not a problem for me, all I'm doing is entering the macro long enough
to get a channel and publish to it, or similar types of things.

The component-system is a wrap of the system-map call with some other data, in an atom, that will be mutated
if we do things like dynamically add subscribers to the system or call (restart! component-system).
There are some thread safety/locking concerns, since a connection failure is likely to be seen by callers
on multiple threads using the components, and I try to avoid race conditions on restarts (only one thread will do the restart until it succeeds, 
the unblock the others).

Hope this helps with strategy, even if the code is omitted.

The trick to me was not in restarting the component stack, but in managing the shared state across threads safely and making sure (with-stack-activity [system] <code>)
code was prepared to re-execute on failures with new connections or other stack components.

In my case I also needed to add components to the stack dynamically.  Not often, but dynamcially, not at (system-map) call time.  That required some lock consideration,
and I'd have to call Lifecycle/start on the stack every time I added a component.  They methods have to be idempotent.

  

Dave Tenny

unread,
Sep 4, 2015, 9:00:39 AM9/4/15
to clo...@googlegroups.com
Caveat: my approach may have been all wrong.  It's the first time I tried stuartsierra components.  Frankly I'd have been happier with some CLOS objects and before and after methods, I'm still getting the hang of this clojure stuff.

--
You received this message because you are subscribed to the Google
Groups "Clojure" group.
To post to this group, send email to clo...@googlegroups.com
Note that posts from new members are moderated - please be patient with your first post.
To unsubscribe from this group, send email to
clojure+u...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/clojure?hl=en
---
You received this message because you are subscribed to a topic in the Google Groups "Clojure" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/clojure/JCvKLIsfSgA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to clojure+u...@googlegroups.com.

zcaudate

unread,
Sep 4, 2015, 1:17:50 PM9/4/15
to Clojure
We make sure the components we build know how to restart themselves. This avoids the use of start/stop.

Josh Tilles

unread,
Sep 4, 2015, 5:12:28 PM9/4/15
to Clojure
I just found the other resources from the Component wiki—it may be that something linked from there addresses exactly my situation.


On Wednesday, September 2, 2015 at 8:44:07 PM UTC-4, jo...@signafire.com wrote:
...

James Reeves

unread,
Sep 5, 2015, 12:35:32 AM9/5/15
to clo...@googlegroups.com
On 3 September 2015 at 00:03, <jo...@signafire.com> wrote:
The HTTP connection may be closed at any time by the server; if that happens, the app should persistently attempt to reconnect using an exponential back-off pattern. In addition, if the app goes thirty seconds without receiving any data, it should close the connection and try to reconnect. It's not clear to me how to best express these "retry" requirements in the component lifecycle.

It seems to me that retrying connections is orthogonal to the component lifecycle. Don't fall into the trap of thinking that start and stop are your only tools. They only determine what should happen when your system begins, and what happens when it ends.

So what does happen when your system starts? Well, from your description:

I'm writing an app that consumes events from a streaming HTTP connection and writes those events to a message queue

It seems to me that a naïve implementation of this would be:

(loop []
  (when-let [event (read-event! stream)]
    (deliver! queue event)
    (recur)

So we block until we can read an event, and once we have the event we write it to the message queue.

From your description, it seems as if the event stream needs to maintain an open HTTP connection, but we expect to need to reconnect for various reasons. The connection will therefore change over the system's runtime, so we need a mutable reference:

(defrecord HttpEventStream [url]
  component/Lifecycle
  (start [component]
    (if (:conn component)
      component
      (assoc component :conn (volatile! (open-connection url))))
  (stop [component]
    (if-let [conn (:conn component)]
      (do (locking conn
            (close-connection @conn)
            (vreset! conn nil))
          (dissoc component :conn))
      component)))

(defn read-event! [{:keys [conn url]}]
  (when conn
    (locking conn
      (when @conn
        (loop []
          (if (closed? @conn)
            (do (vswap! conn (open-connection url)) (recur))
            (read-line-from-body! @conn)))))

Because we're dealing with I/O, instead of using an atom I'm using a volatile and locks, as we explicitly don't want two threads accessing the same connection at once.

Note that I'm also making sure that read-event! can handle being given a component that's been stopped.

If you want the connection to reconnect if there has been too much inactivity, then you'll need to maintain another reference that holds a timestamp of when an event was last read.

- James

jo...@signafire.com

unread,
Sep 6, 2015, 7:50:37 PM9/6/15
to Clojure


On Thursday, September 3, 2015 at 9:15:54 AM UTC-4, Andy- wrote:
Only to answer the "retry on error" part of you question: You might like hara/event:
Thanks for the tip! I've been meaning to check out hara anyway. If I end up using it I'll post a follow-up.

jo...@signafire.com

unread,
Sep 6, 2015, 7:57:30 PM9/6/15
to Clojure
Thanks a ton for writing all of this up. I'm actually using Langohr & RabbitMQ as well, so I have the sense that you've solved problems that I might encounter down the road.

Baptiste Dupuch

unread,
Feb 20, 2017, 12:18:07 PM2/20/17
to Clojure
Josh Tilles, how did you manage connection failure with Langhor and component, I thought there were an automatic connection recovery by default.
Reply all
Reply to author
Forward
0 new messages