I just did something virtually identical to this:
user=> (sh date)
Thu Dec 18 12:19:42 PST 2008
(That's a macro for the convenience of not having to either use string literals or quote the arguments. The real work is done in a function.)
> ...
>
> (let [pb (new ProcessBuilder ["sh" "-c" "yes hello | head -1000; echo
> command finished"])
> proc (.start pb)
> stdout (reader (.getInputStream proc))
> stderr (reader (.getErrorStream proc))
> stdout-agent (agent stdout)
> stderr-agent (agent stderr)]
> (send stdout-agent copy (writer *out*))
> (send stderr-agent copy (writer *err*))
> (await stdout-agent stderr-agent)
> (.waitFor proc)
> (shutdown-agents)
> (println "done"))
>
> Is this use of agents incorrect?
I would say it's an appropriate use, but you need to do it a little
differently: First of all, use (send-off ...) or you'll have to wait for
the agent to complete. Then use (await ...) on the agents.
> Why can the program terminate before all the output from the sub-
> process has been passed through?
As long as the sub-process produces no more output than the operating
system's pipe buffering limit, it can complete without blocking.
> Is there a better way to synchronize with sub-processes in Clojure,
> or is it necessary to synchronize completely at the Java level?
I don't understand this question.
Here's what my implementation looks like. It does not stand alone as
shown, but you can probably figure out what the missing pieces do:
(def *shell* "bash")
(def *shopt* "-c")
(defn- cat-proc-stream
"Copy all the bytes from stream to the writer"
[stream writer]
(binding [*out* writer]
(cat-stream stream)))
(defn shf
"Invoke a platform / external command"
[& args]
(let [out *out*
err *err*
cmd+args (flatten args)
builder (if (and (= (count cmd+args) 1) (string? (first cmd+args)))
(ProcessBuilder. (into-array (conj [] *shell* *shopt* (first cmd+args))))
(ProcessBuilder. (into-array (map str cmd+args))))
process (.start builder)
stdout-copier (agent nil)
stderr-copier (agent nil)]
(send-off stdout-copier #(cat-proc-stream %2 err) (.getErrorStream process))
(send-off stderr-copier #(cat-proc-stream %2 out) (.getInputStream process))
(await stdout-copier stderr-copier))
)
(defmacro sh
"Invoke a platform / external command without evaluating arguments"
[& args]
`(shf '~args))
> Thanks
> Stephan
Randall Schulz
I think I don't understand the difference between (send ...) and
(send-off ...), but that may be where the difference in behavior
between our otherwise very similar code arises.
> > > Why can the program terminate before all the output from the sub-
> > > process has been passed through?
> >
> > As long as the sub-process produces no more output than the
> > operating system's pipe buffering limit, it can complete without
> > blocking.
>
> My question was not precise enough. I meant why can the parent
> process - the Clojure program - terminate before all all the output
> has been passed through.
Because it can terminate whenever it wants to. Child processes do not
place any constraints upon their parents, at least not on Unix systems.
> > > Is there a better way to synchronize with sub-processes in
> > > Clojure, or is it necessary to synchronize completely at the Java
> > > level?
> >
> > I don't understand this question.
>
> As my approach (implement the synchronization at the Clojure level)
> doesn't work obviously, I wondered whether it must be done all at the
> Java level, e.g. don't use Clojure agents, but create Java threads
> explicitly, start them, and wait for completion by using Java
> functions.
Well, your code may not work at the moment, but the approach is sound,
as my code points out (it does work).
> > ...
>
> Thanks for sharing this. At first look it looks similar to my
> approach, but there must be a certain important detail that is
> different.
>
> Regards
> Stephan
Randall Schulz
This is just a buffering/flushing problem. Try adding
(.flush ostream) after your (.println ...)
> Is this use of agents incorrect?
Since the action you're sending could block on IO, you should use
'send-off' instead of 'send'. The difference is that the pool of
threads used by 'send' doesn't grow with demand, so too many blocking
threads could cause new 'send' calls to queue up unnecessarily.
--Chouser
Nothing fancy:
(defn cat-stream
"Copy bytes from an InputStream to *out*"
[stream]
(let [reader (new BufferedReader (new InputStreamReader stream))
buffer (make-array Character/TYPE 1024)]
(loop [n-read (.read reader buffer)]
(when (> n-read 0)
(.write *out* buffer 0 n-read)
(recur (.read reader buffer))))
(.flush *out*)))
> Regards
> Stephan
Randall Schulz
Java has no built-in auto-flushing that I've ever been aware of.
> Regards
> Stephan
Randall Schulz
I should say, none triggered by JVM shut-down. PrintStream and
PrintWriter have an auto-flush mode that you can establish when
constructing, but not control later (for whatever reason).
> > Regards
> > Stephan
Randall Schulz