Lwt/Async abstraction for HTTP library

126 views
Skip to first unread message

Anil Madhavapeddy

unread,
Aug 16, 2012, 9:46:15 PM8/16/12
to ocaml...@googlegroups.com
I've been working on a release of a cooperative threading HTTP library, and have been using the opportunity to experiment with parameterising it over Lwt/Async. I had a few (basic) Async questions as a result.

The code is at:
https://github.com/avsm/ocaml-cohttp/tree/v2-interface
(requires Lwt-current to build at the moment)

The basic IO interface that takes care of low-level parsing is very straightforward to implement in Async and Lwt:
https://github.com/avsm/ocaml-cohttp/blob/v2-interface/async/cohttp_async_raw.ml
https://github.com/avsm/ocaml-cohttp/blob/v2-interface/lwt/cohttp_lwt_raw.ml

The higher-level client HTTP interface is where Lwt and Async diverge, and so have different implementations. The Async version exposes the body as a Pipe which can be lazily consumed or passed to a proxy. This means that pipelined requests must be careful to wait until previous requests have fully consumed their respective bodies before using that particular channel. In Lwt, this can be modelled with Lwt_stream.t that maps over the requests and responses.

In Async, I ran into a few basic questions while building the equivalent Lwt support.

- Only one Scheduler.go is supported (unlike Lwt_main, which can run multiple times in a program), which makes multiple Async tests hard to compose in oUnit. Do you have a custom oUnit that can isolate multiple Async test-cases, or should I write a forking one that does this?

- What's the best way to get a human-readable error? I have this in the oUnit right now, which is very clunky:

let _ = Async_core.Scheduler.within' (fun () ->
Monitor.try_with make_net_req >>=
function
|Error exn ->
(* TODO: how to dump out top-level errors in a nicer way? *)
Printf.fprintf stderr "err %s.\n%!" (Exn.backtrace ()); return ()
|Ok _ -> return ()
) in Async_unix.Scheduler.go ()

- Async_streams are deprecated, and I should use Pipes instead. I need to be able to run a background task that will close the I/O channel whenever the Pipe is completed to close the underlying fd. I obtain a Deferred via Pipe.closed, but how should this run in the background? Should I just ignore the return, or register it with the Scheduler?

In general, are there any toy network programs written using Async that I could crib good style from?

cheers,
Anil

Yaron Minsky

unread,
Aug 19, 2012, 2:54:25 PM8/19/12
to ocaml...@googlegroups.com
On Thu, Aug 16, 2012 at 9:46 PM, Anil Madhavapeddy <an...@recoil.org> wrote:
> I've been working on a release of a cooperative threading HTTP library, and have been using the opportunity to experiment with parameterising it over Lwt/Async. I had a few (basic) Async questions as a result.
>
> The code is at:
> https://github.com/avsm/ocaml-cohttp/tree/v2-interface
> (requires Lwt-current to build at the moment)
>
> The basic IO interface that takes care of low-level parsing is very straightforward to implement in Async and Lwt:
> https://github.com/avsm/ocaml-cohttp/blob/v2-interface/async/cohttp_async_raw.ml
> https://github.com/avsm/ocaml-cohttp/blob/v2-interface/lwt/cohttp_lwt_raw.ml
>
> The higher-level client HTTP interface is where Lwt and Async diverge, and so have different implementations. The Async version exposes the body as a Pipe which can be lazily consumed or passed to a proxy. This means that pipelined requests must be careful to wait until previous requests have fully consumed their respective bodies before using that particular channel. In Lwt, this can be modelled with Lwt_stream.t that maps over the requests and responses.
>
> In Async, I ran into a few basic questions while building the equivalent Lwt support.
>
> - Only one Scheduler.go is supported (unlike Lwt_main, which can run multiple times in a program), which makes multiple Async tests hard to compose in oUnit. Do you have a custom oUnit that can isolate multiple Async test-cases, or should I write a forking one that does this?
>
> - What's the best way to get a human-readable error? I have this in the oUnit right now, which is very clunky:
>
> let _ = Async_core.Scheduler.within' (fun () ->
> Monitor.try_with make_net_req >>=
> function
> |Error exn ->
> (* TODO: how to dump out top-level errors in a nicer way? *)
> Printf.fprintf stderr "err %s.\n%!" (Exn.backtrace ()); return ()
> |Ok _ -> return ()
> ) in Async_unix.Scheduler.go ()

One quick observation: this code would be a little cleaner if you
opened Async.Std. Then you'd write:

open Core.Std
open Async.Std

let _ = within' (fun () ->
try_with make_net_req
>>= function
| Error exn ->
(* TODO: how to dump out top-level errors in a nicer way? *)
fprintf stderr "err %s.\n%!" (Exn.backtrace ());
return ()
| Ok _ -> return ()
)
in
Scheduler.go ()

Further, I don't think the [within'] call is doing you any good, since
you're not using it to set the block group, the monitor or the
priority. So you should be able to just drop that.

Stephen, do you have a view as to how to do the error handling here
properly?

As a side note, we have a new release that should be out in a bit over
a week (it's totally done internally, but Yury, who is responsible for
the external releases, last week and the next.) I think there are
some material changes there that should improve some of the error
presentation.

>
> - Async_streams are deprecated, and I should use Pipes instead. I
> need to be able to run a background task that will close the I/O
> channel whenever the Pipe is completed to close the underlying fd.
> I obtain a Deferred via Pipe.closed, but how should this run in
> the background? Should I just ignore the return, or register it
> with the Scheduler?

Sorry, I'm not quite able to decode the question. Do you have a code
snippet you could share?

> In general, are there any toy network programs written using Async
> that I could crib good style from?

That's an excellent question. We do have some examples that are
presently not included in our external tree. I'll see if we can fix
that.

y

Anil Madhavapeddy

unread,
Aug 20, 2012, 10:11:03 PM8/20/12
to ocaml...@googlegroups.com
On Sun, Aug 19, 2012 at 02:54:25PM -0400, Yaron Minsky wrote:
> One quick observation: this code would be a little cleaner if you
> opened Async.Std. Then you'd write:
>
> open Core.Std
> open Async.Std
>

Ah, I missed the Async package's existence and went straight for
Async_unix. I've now updated the dependency to use Async directly and all
the interfaces are much simpler.

> let _ = within' (fun () ->
> try_with make_net_req
> >>= function
> | Error exn ->
> (* TODO: how to dump out top-level errors in a nicer way? *)
> fprintf stderr "err %s.\n%!" (Exn.backtrace ());
> return ()
> | Ok _ -> return ()
> )
> in
> Scheduler.go ()
>
> Further, I don't think the [within'] call is doing you any good, since
> you're not using it to set the block group, the monitor or the
> priority. So you should be able to just drop that.

This was actually related to me experimenting with what all those options
do (particularly the custom monitor). I'll wait for your new update when
Yury's back and see what the interface looks like. Do you normally just
read the s-expr encoded in Error.t, or do you have something fancier (the
former is good enough for me in practise).

> > - Async_streams are deprecated, and I should use Pipes instead. I
> > need to be able to run a background task that will close the I/O
> > channel whenever the Pipe is completed to close the underlying fd.
> > I obtain a Deferred via Pipe.closed, but how should this run in
> > the background? Should I just ignore the return, or register it
> > with the Scheduler?
>
> Sorry, I'm not quite able to decode the question. Do you have a code
> snippet you could share?

Yeah, in:
https://github.com/avsm/ocaml-cohttp/blob/v2-interface/async/cohttp_async.ml

let pipe_of_body read_fn ic =
let rd, wr = Pipe.create () in
(* Consume from the input channel and write to the new pipe *)
let rec write () =
read_fn ic >>= function
|Transfer.Done ->
Pipe.close wr; return ()
|Transfer.Final_chunk c -> begin
Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>=
function
|`Closed -> return ()
|`Ok _ -> Pipe.close wr; return ()
end
|Transfer.Chunk c -> begin
Pipe.with_write wr ~f:(fun wrfn -> wrfn c) >>=
function
|`Closed -> return ()
|`Ok _ -> write ()
end
in
(* TODO: how to run write () as a background task? *)
let _ = write () in
rd

This function is called to convert a HTTP body into a Pipe pair, such that
the client can lazily consume the (potentially very large) HTTP body, or
pass it onto a proxy process. There is a write() function that reads from
the HTTP body and potentially blocks on the I/O or pushback from the pipe.
We immediately return the reader pipe as a return value from the function.

So my question is what to do with:

(* TODO: how to run write () as a background task? *)
let _ = write () in
rd

Does write() need to be registered with something to be a "proper"
background task for accounting purposes, or to run a cleanup function if
the Reader pipe terminates early?

The equivalent code in Lwt is here and looks like:
https://github.com/avsm/ocaml-cohttp/blob/v2-interface/lwt/cohttp_lwt.ml

let stream_of_body read_fn ic =
let fin = ref false in
Lwt_stream.from (fun () ->
match !fin with
|true -> return None
|false -> begin
match_lwt read_fn ic with
|Transfer.Done ->
return None
|Transfer.Final_chunk c ->
fin := true;
return (Some c);
|Transfer.Chunk c ->
return (Some c)
end
)

The Lwt_stream that is returned has an Lwt.on_terminate handler for when
the consumer hits EOF (e.g. to close the socket), and I think (but am a
little blurry here) cancels the other end with an Lwt.Canceled exception
if the other side stops early.

> > In general, are there any toy network programs written using Async
> > that I could crib good style from?
>
> That's an excellent question. We do have some examples that are
> presently not included in our external tree. I'll see if we can fix
> that.

That would be great!

--
Anil Madhavapeddy http://anil.recoil.org

Yaron Minsky

unread,
Aug 20, 2012, 11:03:47 PM8/20/12
to ocaml...@googlegroups.com
We usually just dump the s-expr that you get from the Error.t.
The narrow answer to this one is: [write ()] is automatically run as a
background task, no registration required. You'd normally write this:

whenever (write ());
rd

where `whenever` is actually a function that just ignores a [unit
Deferred.t]. We decided that `whenever` was a bit of a confusing name
for this, and we changed it to:

don't_wait_for (write ());
rd

in the latest release, which will come out next week. But the effect
is just the same as:

ignore (write () : unit Deferred.t);
rd

> Does write() need to be registered with something to be a "proper"
> background task for accounting purposes, or to run a cleanup function if
> the Reader pipe terminates early?

I need to look at your example in more detail to understand how the
early termination you're talking about works. I'll try to get to that
in the next day or so.

Jérémie Dimino

unread,
Aug 21, 2012, 5:55:48 AM8/21/12
to ocaml...@googlegroups.com
Le Tue, 21 Aug 2012 03:11:03 +0100,
Anil Madhavapeddy <an...@recoil.org> a écrit :

> The Lwt_stream that is returned has an Lwt.on_terminate handler for
> when the consumer hits EOF (e.g. to close the socket), and I think
> (but am a little blurry here) cancels the other end with an
> Lwt.Canceled exception if the other side stops early.

Actually no, cancelling an operation on a Lwt stream does not cancel
its "from" function. Allowing cancels in streams may lead to
unexpected behaviors.

Jérémie

David House

unread,
Aug 23, 2012, 4:01:52 AM8/23/12
to ocaml...@googlegroups.com
On Tue, Aug 21, 2012 at 3:11 AM, Anil Madhavapeddy <an...@recoil.org> wrote:
> So my question is what to do with:
>
> (* TODO: how to run write () as a background task? *)
> let _ = write () in
> rd
>
> Does write() need to be registered with something to be a "proper"
> background task for accounting purposes, or to run a cleanup function if
> the Reader pipe terminates early?

As Yaron points out, it is more idiomatic to say [whenever (write
())], or in the newer version of async, [don't_wait_for (write ())].
For example, the stuff below "TODO" in Client.read_response might be
written:

if close then don't_wait_for (
Pipe.closed body_rd
>>= fun () ->
Reader.close ic
>>= fun () ->
Writer.close oc)

There is no need to register background processes with the scheduler.
Your code is already correct in the case of early termination by the
consumer: one of the [Pipe.write_when_ready] calls will terminate with
`Closed, and your loop will exit.

P.s. I see in your code that you open Deferred. Why is this? All of
the infix operators, plus [return] and a few others, are already in
the scope of Async.Std, so there is no need to go opening anything
else.

Also, two more stylistic points:

1. If the code on the right-hand side of a bind does not return a
deferred, it is common to use >>| instead of >>=, and then delete all
the "return"s on the right-hand side. E.g. in read_response.

2. All code between two binds is guaranteed to execute atomically,
without being interleaved with any other code. This is a very nice
feature of async, and really simplifies thinking about concurrent
code. Because of this, it is idiomatic to put all of your binds
flush-left, so that they stick out visually. (This is not quite
universal in our code base, but is certainly to be encouraged.) You
can see that in my snippet above.

Anil Madhavapeddy

unread,
Sep 5, 2012, 7:29:51 PM9/5/12
to ocaml...@googlegroups.com
David, thanks for all these hints. I was opening Deferred due to missing
the Async.Std, so I've reverted all that now.

> Also, two more stylistic points:
>
> 1. If the code on the right-hand side of a bind does not return a
> deferred, it is common to use >>| instead of >>=, and then delete all
> the "return"s on the right-hand side. E.g. in read_response.
>
> 2. All code between two binds is guaranteed to execute atomically,
> without being interleaved with any other code. This is a very nice
> feature of async, and really simplifies thinking about concurrent
> code. Because of this, it is idiomatic to put all of your binds
> flush-left, so that they stick out visually. (This is not quite
> universal in our code base, but is certainly to be encouraged.) You
> can see that in my snippet above.

Ack. I've been tidying up http://github.com/avsm/ocaml-cohttp to work
across Async, Lwt_unix and Mirage via functors. The result is a reasonably
good HTTP/1.1 (w/ pipelining) client/server for all these threading
systems.

Many of my other thoughts on Async shortcomings seem to be addressed by
the upcoming new release (such as Async equivalents of Core modules such
as Option. I'll play around with them shortly.

I'm just going to finish up SSL bindings for Async to get it to
feature-parity with the Lwt version, and then port the Github API bindings
(http://github.com/avsm/ocaml-github). It's quite useful having two
different threading libraries to really force a good structure between
protocol modules.

Most of these will be packaged up in OPAM by the end of the week, for our
upcoming CUFP tutorial next week.
Reply all
Reply to author
Forward
0 new messages