Building a concurrent server with Async

已查看 308 次
跳至第一个未读帖子

Jon Harrop

未读,
2016年5月3日 14:03:382016/5/3
收件人 ocaml-core
I'm just coming back to OCaml after a few years using F# instead and am wanting to build a concurrent server. I suppose the simplest server with a similar architecture would be a chat server: a client connects, when they send a message the server broadcasts the message to all connected clients. Is there an example chat server written using Async?

I have read the docs on Async (including the great chapter in Real World OCaml!) and I think I am getting to grips with the ideas. Coming from F# I am constantly comparing to that language and libraries, of course. My first impression is that OCaml's Async is somewhat similar but lower level (e.g. no syntax) and single threaded. In F# I build concurrent systems of communicating sequential processes using MailboxProcessor to provide thread-safe queues. I'm not sure what the equivalent is using Async but I guess because it is single threaded I don't need thread-safe anything (in the absence of any other threads at least) so I can fiddle with any data structures freely from anywhere. However, the only way I can think to maintain a collection of current client connections in order to broadcast to them is to use global mutable state. Is that normal using Async or am I missing something?

What is the performance of Async like, particularly latency? I got 114us average latency over Ethernet using F# with a fault tolerant setup. I don't need OCaml to be as fast but anything more than 10ms might be a problem.

Cheers,
Jon.

Drew Atkin

未读,
2016年5月3日 14:33:172016/5/3
收件人 ocaml...@googlegroups.com
On Tue, May 3, 2016 at 2:03 PM, 'Jon Harrop' via ocaml-core <ocaml...@googlegroups.com> wrote:
However, the only way I can think to maintain a collection of current client connections in order to broadcast to them is to use global mutable state. Is that normal using Async or am I missing something?

That's very roughly correct. I tried to cook up the simplest possible example I could.

It uses the basic modules you'd be dealing with:
 - Pipe (from async_kernel)
 - Tcp (from async_extra)
 - Reader and Writer (from async_unix)
 - Bus (from core_kernel, there these is a wrapper in async_extra -- I believe you'll need to be using ppx to get the bus functions to work)

You can run `main.native server 8888`, and then connect clients with `main.native client localhost:8888`, and any data sent to stdin will go to all clients.

=== the code ===

open Core.Std
open Async.Std

let server =
  let bus : (_, read_write) Bus.t =
    Bus.create [%here] Arity1 ~allow_subscription_after_first_write:true ~on_callback_raise:ignore
  in
  let bus_ro = Bus.read_only bus in
  Command.async
    ~summary:"The server"
    Command.Spec.(empty +> anon ("port" %: int))
    (fun port () ->
      Tcp.Server.create (Tcp.on_port port) (fun _ reader writer ->
        Core.Std.printf "new client\n%!";
        let subscriber = Bus.subscribe_exn bus_ro [%here] ~f:(Writer.write writer) in
        Pipe.iter_without_pushback (Reader.pipe reader) ~f:(Bus.write bus)
        >>= fun () ->
        Bus.unsubscribe bus_ro subscriber;
        Deferred.unit)
      >>= fun (_ : _ Tcp.Server.t) ->
      Core.Std.printf "listening\n%!";
      Deferred.never ())

let client =
  Command.async
    ~summary:"The client"
    (Command.Spec.(empty +> anon ("host:port" %: (Arg_type.create Host_and_port.of_string))))
    (fun host_and_port () ->
      let host, port = Host_and_port.tuple host_and_port in
      Tcp.with_connection (Tcp.to_host_and_port host port) (fun _ reader writer -> 
        Core.Std.printf "connected\n%!";
        don't_wait_for (
          Pipe.iter_without_pushback (Reader.pipe reader) ~f:(printf "got: %s\n%!");
        );
        Pipe.iter_without_pushback (Reader.pipe (force Reader.stdin)) ~f:(Writer.write writer)))

let () =
  Command.group
    ~summary:"Basic echo-style broadcast server/client"
    [ "server", server; "client", client; ]
  |> Command.run

Yaron Minsky

未读,
2016年5月3日 23:59:572016/5/3
收件人 ocaml...@googlegroups.com
Also, if you really want a monadic syntax, we do have one:


So you can write:

    let%bind v = e1 in e2

instead of 

    e1 >>= fun v -> e2

There's also let%map, when you want the map equivalent.   Also, it supports and:

    let%bind v1 = e1 and e2 = v2 ... in e'

And you can do match%bind and match%map as well.

As for latency, I wouldn't expect typical latencies to be anything like 10ms.  Obviously, your mileage may vary, particularly depending on how much garbage you generate.  To say the (perhaps) obvious: do as much as you can synchronously, avoid unnecessary allocation, and don't create too many deferreds, and you can make your Async code go very fast indeed.

y




--
You received this message because you are subscribed to the Google Groups "ocaml-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ocaml-core+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Jesper Louis Andersen

未读,
2016年5月4日 05:47:262016/5/4
收件人 ocaml...@googlegroups.com

On Tue, May 3, 2016 at 8:03 PM, 'Jon Harrop' via ocaml-core <ocaml...@googlegroups.com> wrote:
What is the performance of Async like, particularly latency? I got 114us average latency over Ethernet using F# with a fault tolerant setup.

Targeting a mean latency always irks me. It is usually unrepresentative of the latency perception as a whole. It is often better to look at the 99th, 99.99 or maximal latency from a series of requests. Otherwise, the solution is able to cheat by having a few requests/runs with miserable latency as a sacrifice for low latency otherwise.

A system like async, being cooperative, puts the onus of achieving low latency on the programmer. If you write your code in the right way, optimize everything, and break up large computations, you can obtain very low latency counts. Mostly because you can avoid locking in many cases. This allows you to improve latency (in the single core case) by not having to work with synchronization primitives as much.

In other words: it is certainly doable well below 10ms if you know what you are doing.


--
J.

Jon Harrop

未读,
2016年5月4日 07:17:512016/5/4
收件人 ocaml-core
Wow, thanks! Looks like I need to read up on "Bus". :-)

Couple more questions:

  • How do you compile that?
  • What happens if a slow client connects so broadcast responses going to them clog up? Is it a memory leak on the server? Does everything else still work ok? IIRC a Pipe is a queue so you could allow some responses to be queued up and (assuming this is possible) measure the length of the queue when broadcasting and disconnect clients with long queues. Or is there a better way?

Jon Harrop

未读,
2016年5月4日 07:21:322016/5/4
收件人 ocaml-core
The main thing I was thinking of for syntax was a way to have a try..with span several binds. Do you catch and rethrow the exceptions between binds by hand or does it just work as-is?

Thanks for the latency optimisation advice. Makes sense. I cannot wait to get something up and running so I can benchmark it! :-)

Jon Harrop

未读,
2016年5月4日 07:31:162016/5/4
收件人 ocaml-core
Yes. I had something like 99% of messages under 0.5ms, I think. The main high latencies came from the Ethernet network rather than the software. And OCaml's GC is far more incremental than .NET's so I'm hoping this will be much less of a problem (particularly if I can write it without threads by using Async). I'm not sure how much latency comes from low-level concurrency primitives like locking though.

When I did this in F# serialization was a performance bottleneck. I expect OCaml will be able to serialize and deserialize a lot faster but, on the other hand, this won't be done in parallel between concurrent client connections. So I am keen to get this up and running and measure some performance characteristics. :-)

Drew

未读,
2016年5月4日 19:16:372016/5/4
收件人 ocaml-core
On Wednesday, May 4, 2016 at 7:17:51 AM UTC-4, Jon Harrop wrote:
Wow, thanks! Looks like I need to read up on "Bus". :-)

Couple more questions:

  • How do you compile that?
Assuming you're using opam and you've got a reasonably recent version of async (I'm using 113.33.03, but I think >= 113.24.00 is fine), put the code in <blah>.ml and then `corebuild <blah>.native` should do the trick (opam should have added `corebuild` to your path). I've also plopped the example and a build script on github:


  • What happens if a slow client connects so broadcast responses going to them clog up? Is it a memory leak on the server?
Yes, [Writer] buffers the data for you, but it eventually raises an exception to the [Writer]'s monitor if data sits in the buffer too long. If you don't explicitly attach to the monitor yourself, the async schedule raises these monitors to the top-level I believe. 
  • Does everything else still work ok? IIRC a Pipe is a queue so you could allow some responses to be queued up and (assuming this is possible) measure the length of the queue when broadcasting and disconnect clients with long queues. Or is there a better way?
Your memory is correct, [Pipe] is a queue. There's a pushback mechanism in [Pipe] which I've side-stepped in the example by using [Pipe.iter_without_pushback] (e.g. when sending messages to the server). I'm also using the non-async functions in the [Bus] interface to send messages to the clients: everytime data is sent to the [Bus], the [Bus] subscriber callbacks copy that data to the [Writer]s' internal buffers immediately (this happens before [Bus.write] returns, I believe). You can use the deferred returned by [Writer.flushed] to determine when all the data on a [Writer]'s internal buffer has been written to the socket's fd.

Jon Harrop

未读,
2016年5月4日 22:40:002016/5/4
收件人 ocaml-core
Thanks!

I've got your sample up and running. I'm seeing ~3ms latency running the server on a Raspberry Pi over Wifi, which is fine. However, I'm seeing the server die a lot when I stress it...

When I tried to connect 1,000 clients simultaneously it died with a Unix error "too many open files". I guess this is an OS problem so I've tried to fix it by increasing the limit via sysctl.

Then when I try to repeatedly connect, send a message, receive the message and disconnect over and over again after 60k attempts the server died with "writer fd unexpectedly closed ".

And when I try to repeatedly connect and send a message and disconnect the server dies after 800 connections with the unix error "broken pipe".

I thought perhaps I could make all of these problems vanish by adding "~on_handler_error:`Ignore" but then I get a segmentation fault.

Is there an easy way to fix all of these problems? I assume/hope the seg fault is probably because I am on a RPi and other people are already making serious use of this library on x86/64? :-)

Cheers,
Jon.

Arseniy Alekseyev

未读,
2016年5月5日 05:20:382016/5/5
收件人 ocaml...@googlegroups.com
On Wed, May 4, 2016 at 12:21 PM, 'Jon Harrop' via ocaml-core <ocaml...@googlegroups.com> wrote:
The main thing I was thinking of for syntax was a way to have a try..with span several binds. Do you catch and rethrow the exceptions between binds by hand or does it just work as-is?
 
You are right, try..with blocks don't work very well with async. However, we have [Monitor.try_with f]: you normally use it instead of a try..with block and it does work well with multiple binds in [f].

Yaron Minsky

未读,
2016年5月5日 06:29:502016/5/5
收件人 ocaml...@googlegroups.com
It is indeed used quite a lot on x86-64, though it's always possible you're stressing some corner that we don't.  You might want to try running your code on x86-64 to see if the problems are architecture specific.  Also, Drew's example is not doing any kind of pushback to handle cases where the writer can't be written to, so I can imagine lots of things going wrong there.  While it's a fine example of how the libraries work, I don't think of any of this as an idiomatic use of bus.

If you want a pure throughput test, you should drop the bus and the pipe, and just read from the reader and write to the writer directly.  You can also do it with pipes if you prefer, but you shouldn't use iter_without_pushback.

y

--

Yaron Minsky

未读,
2016年5月5日 09:05:142016/5/5
收件人 ocaml...@googlegroups.com
And if your clients and servers happen to both use Async, you might
want to try Async-RPC out...
回复全部
回复作者
转发
0 个新帖子