(Ab)using agents for inter-process communication

102 views
Skip to first unread message

Stephan Mühlstrasser

unread,
Dec 18, 2008, 3:07:24 PM12/18/08
to Clojure
Hi,

I've not yet seen any examples on how to deal with external processes
in Clojure (I hope I didn't overlook something in clojure-contrib).

The following is my attempt to start a sub-process and to pass through
stdout and stderr. The shell command prints out 1000 lines "hello"
and a final "command finished". The problem is that nothing is printed
by the Clojure program. If I increase the number of lines for example
to 2000 (change "head -1000" to "head -2000"), I see a lot of output,
but it is cut off somewhere in the middle and the final "command
finished" does never appear.

(use 'clojure.contrib.duck-streams)

(defn copy
[istream ostream]
(println "copy" istream ostream)
(loop [line (.readLine istream)]
(if line
(do
(.println ostream line)
(recur (.readLine istream))))))

(let [pb (new ProcessBuilder ["sh" "-c" "yes hello | head -1000; echo
command finished"])
proc (.start pb)
stdout (reader (.getInputStream proc))
stderr (reader (.getErrorStream proc))
stdout-agent (agent stdout)
stderr-agent (agent stderr)]
(send stdout-agent copy (writer *out*))
(send stderr-agent copy (writer *err*))
(await stdout-agent stderr-agent)
(.waitFor proc)
(shutdown-agents)
(println "done"))

Is this use of agents incorrect?
Why can the program terminate before all the output from the sub-
process has been passed through?
Is there a better way to synchronize with sub-processes in Clojure, or
is it necessary to synchronize completely at the Java level?

Thanks
Stephan

Randall R Schulz

unread,
Dec 18, 2008, 3:24:27 PM12/18/08
to clo...@googlegroups.com
On Thursday 18 December 2008 12:07, Stephan Mühlstrasser wrote:
> Hi,
>
> I've not yet seen any examples on how to deal with external processes
> in Clojure (I hope I didn't overlook something in clojure-contrib).
>
> The following is my attempt to start a sub-process and to pass
> through stdout and stderr. The shell command prints out 1000 lines
> "hello" and a final "command finished". The problem is that nothing
> is printed by the Clojure program. If I increase the number of lines
> for example to 2000 (change "head -1000" to "head -2000"), I see a
> lot of output, but it is cut off somewhere in the middle and the
> final "command finished" does never appear.

I just did something virtually identical to this:

user=> (sh date)
Thu Dec 18 12:19:42 PST 2008

(That's a macro for the convenience of not having to either use string literals or quote the arguments. The real work is done in a function.)


> ...


>
> (let [pb (new ProcessBuilder ["sh" "-c" "yes hello | head -1000; echo
> command finished"])
> proc (.start pb)
> stdout (reader (.getInputStream proc))
> stderr (reader (.getErrorStream proc))
> stdout-agent (agent stdout)
> stderr-agent (agent stderr)]
> (send stdout-agent copy (writer *out*))
> (send stderr-agent copy (writer *err*))
> (await stdout-agent stderr-agent)
> (.waitFor proc)
> (shutdown-agents)
> (println "done"))
>
> Is this use of agents incorrect?

I would say it's an appropriate use, but you need to do it a little
differently: First of all, use (send-off ...) or you'll have to wait for
the agent to complete. Then use (await ...) on the agents.


> Why can the program terminate before all the output from the sub-
> process has been passed through?

As long as the sub-process produces no more output than the operating
system's pipe buffering limit, it can complete without blocking.


> Is there a better way to synchronize with sub-processes in Clojure,
> or is it necessary to synchronize completely at the Java level?

I don't understand this question.

Here's what my implementation looks like. It does not stand alone as
shown, but you can probably figure out what the missing pieces do:

(def *shell* "bash")
(def *shopt* "-c")

(defn- cat-proc-stream
"Copy all the bytes from stream to the writer"
[stream writer]
(binding [*out* writer]
(cat-stream stream)))

(defn shf
"Invoke a platform / external command"
[& args]
(let [out *out*
err *err*
cmd+args (flatten args)
builder (if (and (= (count cmd+args) 1) (string? (first cmd+args)))
(ProcessBuilder. (into-array (conj [] *shell* *shopt* (first cmd+args))))
(ProcessBuilder. (into-array (map str cmd+args))))
process (.start builder)
stdout-copier (agent nil)
stderr-copier (agent nil)]
(send-off stdout-copier #(cat-proc-stream %2 err) (.getErrorStream process))
(send-off stderr-copier #(cat-proc-stream %2 out) (.getInputStream process))
(await stdout-copier stderr-copier))
)

(defmacro sh
"Invoke a platform / external command without evaluating arguments"
[& args]
`(shf '~args))


> Thanks
> Stephan


Randall Schulz

Stephan Mühlstrasser

unread,
Dec 18, 2008, 3:43:28 PM12/18/08
to Clojure


On Dec 18, 9:24 pm, Randall R Schulz <rsch...@sonic.net> wrote:
> On Thursday 18 December 2008 12:07, Stephan Mühlstrasser wrote:
>
> > (let [pb (new ProcessBuilder ["sh" "-c" "yes hello | head -1000; echo
> > command finished"])
> >         proc (.start pb)
> >         stdout (reader (.getInputStream proc))
> >         stderr (reader (.getErrorStream proc))
> >         stdout-agent (agent stdout)
> >         stderr-agent (agent stderr)]
> >     (send stdout-agent copy (writer *out*))
> >     (send stderr-agent copy (writer *err*))
> >     (await stdout-agent stderr-agent)
> >     (.waitFor proc)
> >     (shutdown-agents)
> >     (println "done"))
>
> > Is this use of agents incorrect?
>
> I would say it's an appropriate use, but you need to do it a little
> differently: First of all, use (send-off ...) or you'll have to wait for
> the agent to complete. Then use (await ...) on the agents.

I do use (await ...) on the agents. And I have also tried (send-
off ...), but it didn't make a difference.

> > Why can the program terminate before all the output from the sub-
> > process has been passed through?
>
> As long as the sub-process produces no more output than the operating
> system's pipe buffering limit, it can complete without blocking.

My question was not precise enough. I meant why can the parent process
- the Clojure program - terminate before all all the output has been
passed through.

> > Is there a better way to synchronize with sub-processes in Clojure,
> > or is it necessary to synchronize completely at the Java level?
>
> I don't understand this question.

As my approach (implement the synchronization at the Clojure level)
doesn't work obviously, I wondered whether it must be done all at the
Java level, e.g. don't use Clojure agents, but create Java threads
explicitly, start them, and wait for completion by using Java
functions.

> Here's what my implementation looks like. It does not stand alone as
> shown, but you can probably figure out what the missing pieces do:
>

Thanks for sharing this. At first look it looks similar to my
approach, but there must be a certain important detail that is
different.

> Randall Schulz

Regards
Stephan

Randall R Schulz

unread,
Dec 18, 2008, 4:01:21 PM12/18/08
to clo...@googlegroups.com
On Thursday 18 December 2008 12:43, Stephan Mühlstrasser wrote:
> On Dec 18, 9:24 pm, Randall R Schulz <rsch...@sonic.net> wrote:
> > On Thursday 18 December 2008 12:07, Stephan Mühlstrasser wrote:
> > > ...

> > >
> > > Is this use of agents incorrect?
> >
> > I would say it's an appropriate use, but you need to do it a little
> > differently: First of all, use (send-off ...) or you'll have to
> > wait for the agent to complete. Then use (await ...) on the agents.
>
> I do use (await ...) on the agents. And I have also tried (send-
> off ...), but it didn't make a difference.

I think I don't understand the difference between (send ...) and
(send-off ...), but that may be where the difference in behavior
between our otherwise very similar code arises.


> > > Why can the program terminate before all the output from the sub-
> > > process has been passed through?
> >
> > As long as the sub-process produces no more output than the
> > operating system's pipe buffering limit, it can complete without
> > blocking.
>
> My question was not precise enough. I meant why can the parent
> process - the Clojure program - terminate before all all the output
> has been passed through.

Because it can terminate whenever it wants to. Child processes do not
place any constraints upon their parents, at least not on Unix systems.


> > > Is there a better way to synchronize with sub-processes in
> > > Clojure, or is it necessary to synchronize completely at the Java
> > > level?
> >
> > I don't understand this question.
>
> As my approach (implement the synchronization at the Clojure level)
> doesn't work obviously, I wondered whether it must be done all at the
> Java level, e.g. don't use Clojure agents, but create Java threads
> explicitly, start them, and wait for completion by using Java
> functions.

Well, your code may not work at the moment, but the approach is sound,
as my code points out (it does work).


> > ...


>
> Thanks for sharing this. At first look it looks similar to my
> approach, but there must be a certain important detail that is
> different.
>

> Regards
> Stephan


Randall Schulz

Chouser

unread,
Dec 18, 2008, 4:10:57 PM12/18/08
to clo...@googlegroups.com
On Thu, Dec 18, 2008 at 3:07 PM, Stephan Mühlstrasser
<stephan.mu...@web.de> wrote:
>
> The following is my attempt to start a sub-process and to pass through
> stdout and stderr. The shell command prints out 1000 lines "hello"
> and a final "command finished". The problem is that nothing is printed
> by the Clojure program. If I increase the number of lines for example
> to 2000 (change "head -1000" to "head -2000"), I see a lot of output,
> but it is cut off somewhere in the middle and the final "command
> finished" does never appear.

This is just a buffering/flushing problem. Try adding
(.flush ostream) after your (.println ...)

> Is this use of agents incorrect?

Since the action you're sending could block on IO, you should use
'send-off' instead of 'send'. The difference is that the pool of
threads used by 'send' doesn't grow with demand, so too many blocking
threads could cause new 'send' calls to queue up unnecessarily.

--Chouser

Stephan Mühlstrasser

unread,
Dec 18, 2008, 4:33:13 PM12/18/08
to Clojure
On Dec 18, 10:01 pm, Randall R Schulz <rsch...@sonic.net> wrote:
>
> > My question was not precise enough. I meant why can the parent
> > process - the Clojure program - terminate before all all the output
> > has been passed through.
>
> Because it can terminate whenever it wants to. Child processes do not
> place any constraints upon their parents, at least not on Unix systems.

I understand that the parent process can terminate whenever it wants.
But in my program the "copy" function recurs over readLine until it
returns null/nil, so it should read until the end of the output from
the child process. And the calling thread waits for the completion of
the agent functions with (await ...). The main thread should not come
out of the (await ...) until both agent functions have copied their
whole stream... But somehow the "copy" function is interrupted when
the child process terminates.

Maybe the missing piece is how your (cat-stream ...) function works,
which is not included in your listing. How does it copy the data?

Regards
Stephan

Stephan Mühlstrasser

unread,
Dec 18, 2008, 4:40:55 PM12/18/08
to Clojure
On Dec 18, 10:10 pm, Chouser <chou...@gmail.com> wrote:
> On Thu, Dec 18, 2008 at 3:07 PM, Stephan Mühlstrasser
>
> <stephan.muehlstras...@web.de> wrote:
>
> > The following is my attempt to start a sub-process and to pass through
> > stdout and stderr.  The shell command prints out 1000 lines "hello"
> > and a final "command finished". The problem is that nothing is printed
> > by the Clojure program. If I increase the number of lines for example
> > to 2000 (change "head -1000" to "head -2000"), I see a lot of output,
> > but it is cut off somewhere in the middle and the final "command
> > finished" does never appear.
>
> This is just a buffering/flushing problem.  Try adding
> (.flush ostream) after your (.println ...)

Thanks, that was it! I put the flush in the else clause in copy, so it
flushes once at the end.

> > Is this use of agents incorrect?
>
> Since the action you're sending could block on IO, you should use
> 'send-off' instead of 'send'.  The difference is that the pool of
> threads used by 'send' doesn't grow with demand, so too many blocking
> threads could cause new 'send' calls to queue up unnecessarily.

Ah, I believe I finally understand the difference between send and
send-off. To describe it in my own words, send-off creates a new
thread each time, while send schedules to a thread in a thread pool.

So this is the complete example again which now does work as expected:

(use 'clojure.contrib.duck-streams)

(defn copy
[istream ostream]
(println "copy" istream ostream)
(loop [line (.readLine istream)]
(if line
(do
(.println ostream line)
(recur (.readLine istream)))
(.flush ostream))))

(let [pb (new ProcessBuilder ["sh" "-c" "yes hello | head -1000; echo
command finished"])
proc (.start pb)
stdout (reader (.getInputStream proc))
stderr (reader (.getErrorStream proc))
stdout-agent (agent stdout)
stderr-agent (agent stderr)]
(send-off stdout-agent copy (writer *out*))
(send-off stderr-agent copy (writer *err*))
(await stdout-agent stderr-agent)
(.waitFor proc)
(shutdown-agents)
(println "done"))

Thanks
Stephan

Dave Griffith

unread,
Dec 18, 2008, 5:04:57 PM12/18/08
to Clojure

>
> Ah, I believe I finally understand the difference between send and
> send-off. To describe it in my own words, send-off creates a new
> thread each time, while send schedules to a thread in a thread pool.
>

Not quite true, but close. Send-off requests a thread from a
CachingThreadPool, but since CachingThreadPools are effectively
unlimitted you get the effect you describe, with less overhead.

Randall R Schulz

unread,
Dec 18, 2008, 5:30:41 PM12/18/08
to clo...@googlegroups.com

Nothing fancy:

(defn cat-stream
"Copy bytes from an InputStream to *out*"
[stream]
(let [reader (new BufferedReader (new InputStreamReader stream))
buffer (make-array Character/TYPE 1024)]
(loop [n-read (.read reader buffer)]
(when (> n-read 0)
(.write *out* buffer 0 n-read)
(recur (.read reader buffer))))
(.flush *out*)))


> Regards
> Stephan


Randall Schulz

Stephan Mühlstrasser

unread,
Dec 18, 2008, 5:40:48 PM12/18/08
to Clojure
On Dec 18, 11:30 pm, Randall R Schulz <rsch...@sonic.net> wrote:
> On Thursday 18 December 2008 13:33, Stephan Mühlstrasser wrote:
>
>
> Nothing fancy:
>
> (defn cat-stream
> ....
>   (.flush *out*)))
>

As Chouser pointed out, the flush is the important ingredient.

After thinking a while about this, I'm wondering why it is necessary.
The output stream should be flushed automatically when the program
exits, but it looks like with Java and threads this is different.

Regards
Stephan

Randall R Schulz

unread,
Dec 18, 2008, 5:44:09 PM12/18/08
to clo...@googlegroups.com

Java has no built-in auto-flushing that I've ever been aware of.


> Regards
> Stephan


Randall Schulz

Randall R Schulz

unread,
Dec 18, 2008, 5:47:02 PM12/18/08
to clo...@googlegroups.com
On Thursday 18 December 2008 14:44, Randall R Schulz wrote:
> On Thursday 18 December 2008 14:40, Stephan Mühlstrasser wrote:
> > ...

> >
> > As Chouser pointed out, the flush is the important ingredient.
> >
> > After thinking a while about this, I'm wondering why it is
> > necessary. The output stream should be flushed automatically when
> > the program exits, but it looks like with Java and threads this is
> > different.
>
> Java has no built-in auto-flushing that I've ever been aware of.

I should say, none triggered by JVM shut-down. PrintStream and
PrintWriter have an auto-flush mode that you can establish when
constructing, but not control later (for whatever reason).


> > Regards
> > Stephan


Randall Schulz

Reply all
Reply to author
Forward
0 new messages