I've been working on a release of a cooperative threading HTTP library, and have been using the opportunity to experiment with parameterising it over Lwt/Async. I had a few (basic) Async questions as a result.
The higher-level client HTTP interface is where Lwt and Async diverge, and so have different implementations. The Async version exposes the body as a Pipe which can be lazily consumed or passed to a proxy. This means that pipelined requests must be careful to wait until previous requests have fully consumed their respective bodies before using that particular channel. In Lwt, this can be modelled with Lwt_stream.t that maps over the requests and responses.
In Async, I ran into a few basic questions while building the equivalent Lwt support.
- Only one Scheduler.go is supported (unlike Lwt_main, which can run multiple times in a program), which makes multiple Async tests hard to compose in oUnit. Do you have a custom oUnit that can isolate multiple Async test-cases, or should I write a forking one that does this?
- What's the best way to get a human-readable error? I have this in the oUnit right now, which is very clunky:
let _ = Async_core.Scheduler.within' (fun () ->
Monitor.try_with make_net_req >>=
function
|Error exn -> (* TODO: how to dump out top-level errors in a nicer way? *)
Printf.fprintf stderr "err %s.\n%!" (Exn.backtrace ()); return ()
|Ok _ -> return ()
) in Async_unix.Scheduler.go ()
- Async_streams are deprecated, and I should use Pipes instead. I need to be able to run a background task that will close the I/O channel whenever the Pipe is completed to close the underlying fd. I obtain a Deferred via Pipe.closed, but how should this run in the background? Should I just ignore the return, or register it with the Scheduler?
In general, are there any toy network programs written using Async that I could crib good style from?
On Thu, Aug 16, 2012 at 9:46 PM, Anil Madhavapeddy <a...@recoil.org> wrote:
> I've been working on a release of a cooperative threading HTTP library, and have been using the opportunity to experiment with parameterising it over Lwt/Async. I had a few (basic) Async questions as a result.
> The higher-level client HTTP interface is where Lwt and Async diverge, and so have different implementations. The Async version exposes the body as a Pipe which can be lazily consumed or passed to a proxy. This means that pipelined requests must be careful to wait until previous requests have fully consumed their respective bodies before using that particular channel. In Lwt, this can be modelled with Lwt_stream.t that maps over the requests and responses.
> In Async, I ran into a few basic questions while building the equivalent Lwt support.
> - Only one Scheduler.go is supported (unlike Lwt_main, which can run multiple times in a program), which makes multiple Async tests hard to compose in oUnit. Do you have a custom oUnit that can isolate multiple Async test-cases, or should I write a forking one that does this?
> - What's the best way to get a human-readable error? I have this in the oUnit right now, which is very clunky:
> let _ = Async_core.Scheduler.within' (fun () ->
> Monitor.try_with make_net_req >>=
> function
> |Error exn ->
> (* TODO: how to dump out top-level errors in a nicer way? *)
> Printf.fprintf stderr "err %s.\n%!" (Exn.backtrace ()); return ()
> |Ok _ -> return ()
> ) in Async_unix.Scheduler.go ()
One quick observation: this code would be a little cleaner if you
opened Async.Std. Then you'd write:
open Core.Std
open Async.Std
let _ = within' (fun () ->
try_with make_net_req
>>= function
| Error exn ->
(* TODO: how to dump out top-level errors in a nicer way? *)
fprintf stderr "err %s.\n%!" (Exn.backtrace ());
return ()
| Ok _ -> return ()
)
in
Scheduler.go ()
Further, I don't think the [within'] call is doing you any good, since
you're not using it to set the block group, the monitor or the
priority. So you should be able to just drop that.
Stephen, do you have a view as to how to do the error handling here
properly?
As a side note, we have a new release that should be out in a bit over
a week (it's totally done internally, but Yury, who is responsible for
the external releases, last week and the next.) I think there are
some material changes there that should improve some of the error
presentation.
> - Async_streams are deprecated, and I should use Pipes instead. I
> need to be able to run a background task that will close the I/O
> channel whenever the Pipe is completed to close the underlying fd.
> I obtain a Deferred via Pipe.closed, but how should this run in
> the background? Should I just ignore the return, or register it
> with the Scheduler?
Sorry, I'm not quite able to decode the question. Do you have a code
snippet you could share?
> In general, are there any toy network programs written using Async
> that I could crib good style from?
That's an excellent question. We do have some examples that are
presently not included in our external tree. I'll see if we can fix
that.
On Sun, Aug 19, 2012 at 02:54:25PM -0400, Yaron Minsky wrote:
> One quick observation: this code would be a little cleaner if you
> opened Async.Std. Then you'd write:
> open Core.Std
> open Async.Std
Ah, I missed the Async package's existence and went straight for
Async_unix. I've now updated the dependency to use Async directly and all
the interfaces are much simpler.
> let _ = within' (fun () ->
> try_with make_net_req
> >>= function
> | Error exn ->
> (* TODO: how to dump out top-level errors in a nicer way? *)
> fprintf stderr "err %s.\n%!" (Exn.backtrace ());
> return ()
> | Ok _ -> return ()
> )
> in
> Scheduler.go ()
> Further, I don't think the [within'] call is doing you any good, since
> you're not using it to set the block group, the monitor or the
> priority. So you should be able to just drop that.
This was actually related to me experimenting with what all those options
do (particularly the custom monitor). I'll wait for your new update when
Yury's back and see what the interface looks like. Do you normally just
read the s-expr encoded in Error.t, or do you have something fancier (the
former is good enough for me in practise).
> > - Async_streams are deprecated, and I should use Pipes instead. I
> > need to be able to run a background task that will close the I/O
> > channel whenever the Pipe is completed to close the underlying fd.
> > I obtain a Deferred via Pipe.closed, but how should this run in
> > the background? Should I just ignore the return, or register it
> > with the Scheduler?
> Sorry, I'm not quite able to decode the question. Do you have a code
> snippet you could share?
let pipe_of_body read_fn ic =
let rd, wr = Pipe.create () in
(* Consume from the input channel and write to the new pipe *)
let rec write () =
read_fn ic >>= function
|Transfer.Done ->
Pipe.close wr; return ()
|Transfer.Final_chunk c -> begin
Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>= function
|`Closed -> return ()
|`Ok _ -> Pipe.close wr; return ()
end
|Transfer.Chunk c -> begin
Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>=
function
|`Closed -> return ()
|`Ok _ -> write () end
in
(* TODO: how to run write () as a background task? *)
let _ = write () in
rd
This function is called to convert a HTTP body into a Pipe pair, such that
the client can lazily consume the (potentially very large) HTTP body, or
pass it onto a proxy process. There is a write() function that reads from
the HTTP body and potentially blocks on the I/O or pushback from the pipe.
We immediately return the reader pipe as a return value from the function.
So my question is what to do with:
(* TODO: how to run write () as a background task? *)
let _ = write () in
rd
Does write() need to be registered with something to be a "proper"
background task for accounting purposes, or to run a cleanup function if
the Reader pipe terminates early?
let stream_of_body read_fn ic =
let fin = ref false in
Lwt_stream.from (fun () ->
match !fin with
|true -> return None
|false -> begin
match_lwt read_fn ic with
|Transfer.Done -> return None
|Transfer.Final_chunk c ->
fin := true;
return (Some c);
|Transfer.Chunk c ->
return (Some c)
end
)
The Lwt_stream that is returned has an Lwt.on_terminate handler for when
the consumer hits EOF (e.g. to close the socket), and I think (but am a
little blurry here) cancels the other end with an Lwt.Canceled exception
if the other side stops early.
> > In general, are there any toy network programs written using Async
> > that I could crib good style from?
> That's an excellent question. We do have some examples that are
> presently not included in our external tree. I'll see if we can fix
> that.
On Mon, Aug 20, 2012 at 10:11 PM, Anil Madhavapeddy <a...@recoil.org> wrote:
> On Sun, Aug 19, 2012 at 02:54:25PM -0400, Yaron Minsky wrote:
>> One quick observation: this code would be a little cleaner if you
>> opened Async.Std. Then you'd write:
>> open Core.Std
>> open Async.Std
> Ah, I missed the Async package's existence and went straight for
> Async_unix. I've now updated the dependency to use Async directly and all
> the interfaces are much simpler.
>> let _ = within' (fun () ->
>> try_with make_net_req
>> >>= function
>> | Error exn ->
>> (* TODO: how to dump out top-level errors in a nicer way? *)
>> fprintf stderr "err %s.\n%!" (Exn.backtrace ());
>> return ()
>> | Ok _ -> return ()
>> )
>> in
>> Scheduler.go ()
>> Further, I don't think the [within'] call is doing you any good, since
>> you're not using it to set the block group, the monitor or the
>> priority. So you should be able to just drop that.
> This was actually related to me experimenting with what all those options
> do (particularly the custom monitor). I'll wait for your new update when
> Yury's back and see what the interface looks like. Do you normally just
> read the s-expr encoded in Error.t, or do you have something fancier (the
> former is good enough for me in practise).
We usually just dump the s-expr that you get from the Error.t.
>> > - Async_streams are deprecated, and I should use Pipes instead. I
>> > need to be able to run a background task that will close the I/O
>> > channel whenever the Pipe is completed to close the underlying fd.
>> > I obtain a Deferred via Pipe.closed, but how should this run in
>> > the background? Should I just ignore the return, or register it
>> > with the Scheduler?
>> Sorry, I'm not quite able to decode the question. Do you have a code
>> snippet you could share?
> let pipe_of_body read_fn ic =
> let rd, wr = Pipe.create () in
> (* Consume from the input channel and write to the new pipe *)
> let rec write () =
> read_fn ic >>= function
> |Transfer.Done ->
> Pipe.close wr; return ()
> |Transfer.Final_chunk c -> begin
> Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>=
> function
> |`Closed -> return ()
> |`Ok _ -> Pipe.close wr; return ()
> end
> |Transfer.Chunk c -> begin
> Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>=
> function
> |`Closed -> return ()
> |`Ok _ -> write ()
> end
> in
> (* TODO: how to run write () as a background task? *)
> let _ = write () in
> rd
> This function is called to convert a HTTP body into a Pipe pair, such that
> the client can lazily consume the (potentially very large) HTTP body, or
> pass it onto a proxy process. There is a write() function that reads from
> the HTTP body and potentially blocks on the I/O or pushback from the pipe.
> We immediately return the reader pipe as a return value from the function.
> So my question is what to do with:
> (* TODO: how to run write () as a background task? *)
> let _ = write () in
> rd
The narrow answer to this one is: [write ()] is automatically run as a
background task, no registration required. You'd normally write this:
whenever (write ());
rd
where `whenever` is actually a function that just ignores a [unit
Deferred.t]. We decided that `whenever` was a bit of a confusing name
for this, and we changed it to:
don't_wait_for (write ());
rd
in the latest release, which will come out next week. But the effect
is just the same as:
ignore (write () : unit Deferred.t);
rd
> Does write() need to be registered with something to be a "proper"
> background task for accounting purposes, or to run a cleanup function if
> the Reader pipe terminates early?
I need to look at your example in more detail to understand how the
early termination you're talking about works. I'll try to get to that
in the next day or so.
> let stream_of_body read_fn ic =
> let fin = ref false in
> Lwt_stream.from (fun () ->
> match !fin with
> |true -> return None
> |false -> begin
> match_lwt read_fn ic with
> |Transfer.Done ->
> return None
> |Transfer.Final_chunk c ->
> fin := true;
> return (Some c);
> |Transfer.Chunk c ->
> return (Some c)
> end
> )
> The Lwt_stream that is returned has an Lwt.on_terminate handler for when
> the consumer hits EOF (e.g. to close the socket), and I think (but am a
> little blurry here) cancels the other end with an Lwt.Canceled exception
> if the other side stops early.
>> > In general, are there any toy network programs written using Async
>> > that I could crib good style from?
>> That's an excellent question. We do have some examples that are
>> presently not included in our external tree. I'll see if we can fix
>> that.
Le Tue, 21 Aug 2012 03:11:03 +0100,
Anil Madhavapeddy <a...@recoil.org> a écrit :
> The Lwt_stream that is returned has an Lwt.on_terminate handler for
> when the consumer hits EOF (e.g. to close the socket), and I think
> (but am a little blurry here) cancels the other end with an
> Lwt.Canceled exception if the other side stops early.
Actually no, cancelling an operation on a Lwt stream does not cancel
its "from" function. Allowing cancels in streams may lead to
unexpected behaviors.
On Tue, Aug 21, 2012 at 3:11 AM, Anil Madhavapeddy <a...@recoil.org> wrote:
> So my question is what to do with:
> (* TODO: how to run write () as a background task? *)
> let _ = write () in
> rd
> Does write() need to be registered with something to be a "proper"
> background task for accounting purposes, or to run a cleanup function if
> the Reader pipe terminates early?
As Yaron points out, it is more idiomatic to say [whenever (write
())], or in the newer version of async, [don't_wait_for (write ())].
For example, the stuff below "TODO" in Client.read_response might be
written:
if close then don't_wait_for (
Pipe.closed body_rd
>>= fun () ->
Reader.close ic
>>= fun () ->
Writer.close oc)
There is no need to register background processes with the scheduler.
Your code is already correct in the case of early termination by the
consumer: one of the [Pipe.write_when_ready] calls will terminate with
`Closed, and your loop will exit.
P.s. I see in your code that you open Deferred. Why is this? All of
the infix operators, plus [return] and a few others, are already in
the scope of Async.Std, so there is no need to go opening anything
else.
Also, two more stylistic points:
1. If the code on the right-hand side of a bind does not return a
deferred, it is common to use >>| instead of >>=, and then delete all
the "return"s on the right-hand side. E.g. in read_response.
2. All code between two binds is guaranteed to execute atomically,
without being interleaved with any other code. This is a very nice
feature of async, and really simplifies thinking about concurrent
code. Because of this, it is idiomatic to put all of your binds
flush-left, so that they stick out visually. (This is not quite
universal in our code base, but is certainly to be encouraged.) You
can see that in my snippet above.
On Thu, Aug 23, 2012 at 09:01:52AM +0100, David House wrote:
> On Tue, Aug 21, 2012 at 3:11 AM, Anil Madhavapeddy <a...@recoil.org> wrote:
> > So my question is what to do with:
> > (* TODO: how to run write () as a background task? *)
> > let _ = write () in
> > rd
> > Does write() need to be registered with something to be a "proper"
> > background task for accounting purposes, or to run a cleanup function if
> > the Reader pipe terminates early?
> As Yaron points out, it is more idiomatic to say [whenever (write
> ())], or in the newer version of async, [don't_wait_for (write ())].
> For example, the stuff below "TODO" in Client.read_response might be
> written:
> if close then don't_wait_for (
> Pipe.closed body_rd
> >>= fun () ->
> Reader.close ic
> >>= fun () ->
> Writer.close oc)
> There is no need to register background processes with the scheduler.
> Your code is already correct in the case of early termination by the
> consumer: one of the [Pipe.write_when_ready] calls will terminate with
> `Closed, and your loop will exit.
> P.s. I see in your code that you open Deferred. Why is this? All of
> the infix operators, plus [return] and a few others, are already in
> the scope of Async.Std, so there is no need to go opening anything
> else.
David, thanks for all these hints. I was opening Deferred due to missing
the Async.Std, so I've reverted all that now.
> Also, two more stylistic points:
> 1. If the code on the right-hand side of a bind does not return a
> deferred, it is common to use >>| instead of >>=, and then delete all
> the "return"s on the right-hand side. E.g. in read_response.
> 2. All code between two binds is guaranteed to execute atomically,
> without being interleaved with any other code. This is a very nice
> feature of async, and really simplifies thinking about concurrent
> code. Because of this, it is idiomatic to put all of your binds
> flush-left, so that they stick out visually. (This is not quite
> universal in our code base, but is certainly to be encouraged.) You
> can see that in my snippet above.
Ack. I've been tidying up http://github.com/avsm/ocaml-cohttp to work
across Async, Lwt_unix and Mirage via functors. The result is a reasonably
good HTTP/1.1 (w/ pipelining) client/server for all these threading
systems.
Many of my other thoughts on Async shortcomings seem to be addressed by
the upcoming new release (such as Async equivalents of Core modules such
as Option. I'll play around with them shortly.
I'm just going to finish up SSL bindings for Async to get it to
feature-parity with the Lwt version, and then port the Github API bindings
(http://github.com/avsm/ocaml-github). It's quite useful having two
different threading libraries to really force a good structure between
protocol modules.
Most of these will be packaged up in OPAM by the end of the week, for our
upcoming CUFP tutorial next week.