Async fork and closing descriptors

53 views
Skip to first unread message

Andre Nathan

unread,
Jan 29, 2015, 11:41:09 AM1/29/15
to ocaml...@googlegroups.com
Hello

Below is a small program that creates a pair of file descriptors via `Unix.socketpair`, and forks a child process. The parent then proceeds to loop writing a byte of data on its descriptor on each iteration, which then is read by the child, after dup2'ing its descriptor to stdin.

I tried to write a wrapper to Unix.fork as below but it seems to block forever, so I had to replace it with a `return (Core.Unix.fork ())`. Is this expected?

let fork () =
  In_thread.syscall_exn ~name:"fork" Core.Std.Unix.fork

The second question is that the commented calls to `Unix.close` in the program below cause a "broken pipe error" when uncommented. This is unexpected because in the parent process the descriptor is not used at all, and in the child process it has been dup2'd to stdin, so closing it should cause no issues either. Any hints on what could be going on?

open Core.Std
open Async.Std

let dup2 src dst =
  In_thread.syscall_exn ~name:"dup2"
    (fun () ->
      Fd.with_file_descr_exn src
        (fun src ->
          Fd.with_file_descr_exn dst
            (fun dst ->
              Core.Std.Unix.dup2 ~src ~dst)))

let fork () =
  return (Core.Std.Unix.fork ())

let write_to_child wr =
  let rec write i =
    printf "-> %d\n%!" i;
    Writer.write wr (Int.to_string i);
    Writer.flushed wr >>= fun () ->
    Clock.after (Time.Span.of_sec 1.0) >>= fun () ->
    write (i + 1) in
  write 0

let read_from_parent rd =
  let rec read () =
    let buf = String.create 1 in
    Reader.read rd buf >>= function
    | `Eof ->
        printf "EOF\n%!";
        return ()
    | `Ok _ ->
        printf "<- %d\n%!" (Int.of_string buf);
        read () in
  read ()

let main =
  let parent_fd, child_fd = Unix.socketpair () in
  let child () =
    let stdin = Fd.stdin () in
    dup2 child_fd stdin >>= fun () ->
    (* XXX causes EPIPE *)
    (*Unix.close child_fd >>= fun () ->*)
    let rd = Reader.create stdin in
    read_from_parent rd in
  let parent pid =
    (* XXX causes EPIPE *)
    (*Unix.close child_fd >>= fun () ->*)
    let wr = Writer.create parent_fd in
    let _ = write_to_child wr in
    Unix.waitpid pid >>= function
    | Ok () -> exit 0
    | _ -> exit 1 in
  fork () >>= function
  | `In_the_child -> child ()
  | `In_the_parent pid -> parent pid

let () =
  never_returns (Scheduler.go ())

Thanks in advance,
Andre

Arseniy Alekseyev

unread,
Jan 29, 2015, 12:20:36 PM1/29/15
to ocaml...@googlegroups.com
I think this is because Async uses multiple OS threads, and you can't fork a multithreaded program (all threads except the one that called fork end up dying).

Instead you could try forking before starting the scheduler.
That would prevent you from using the rest of async library to set up the shared file descriptors, which is unfortunate. Not sure if there is a way around it..

Jeremie Dimino

unread,
Jan 29, 2015, 12:28:20 PM1/29/15
to ocaml...@googlegroups.com
As Arseniy said you should fork before starting the scheduler. Forking once the scheduler is started and using async in the child is not supported.

For the second error you are seing I think it's because [Fd.close] calls [Unix.shutdown ~mode:SHUTDOWN_ALL] if the file descriptor is a socket.

--
You received this message because you are subscribed to the Google Groups "ocaml-core" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ocaml-core+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Jeremie

Arseniy Alekseyev

unread,
Jan 29, 2015, 12:28:48 PM1/29/15
to ocaml...@googlegroups.com
Oh, I see you are not using Async to create file handles anyway, so it's OK.

To make it more clear, the function
[In_thread.syscall_exn ~name:"fork" Core.Std.Unix.fork] 
does not fork the process, but rather pushes the job to fork the process onto the async job queue. When you start the async scheduler (Scheduler.go) it spawns a bunch of threads and executes the job in one of them. Your process forks, but the threads end up dying in the child process.

Andre Nathan

unread,
Jan 29, 2015, 12:29:44 PM1/29/15
to ocaml...@googlegroups.com
On Thursday, January 29, 2015 at 3:20:36 PM UTC-2, Arseniy Alekseyev wrote:
I think this is because Async uses multiple OS threads, and you can't fork a multithreaded program (all threads except the one that called fork end up dying).

Hi Arseniy

Lwt has an `Lwt_unix.fork` function which resets the threading state in the child process (empty the thread queue and zero counters). I believe it should be possible in Async too.

Cheers,
Andre

Andre Nathan

unread,
Jan 29, 2015, 1:27:05 PM1/29/15
to ocaml...@googlegroups.com
Hi Jeremie

On Thursday, January 29, 2015 at 3:28:20 PM UTC-2, Jeremie Dimino wrote:
As Arseniy said you should fork before starting the scheduler. Forking once the scheduler is started and using async in the child is not supported.

Would forking be safe if I call `Scheduler.reset_in_forked_process` first thing in the child process?
 
For the second error you are seing I think it's because [Fd.close] calls [Unix.shutdown ~mode:SHUTDOWN_ALL] if the file descriptor is a socket.

Is there a way to avoid the shutdown call?

Thanks,
Andre

Jeremie Dimino

unread,
Jan 30, 2015, 5:10:33 AM1/30/15
to ocaml...@googlegroups.com
On Thu, Jan 29, 2015 at 6:27 PM, Andre Nathan <andr...@gmail.com> wrote:
Hi Jeremie

On Thursday, January 29, 2015 at 3:28:20 PM UTC-2, Jeremie Dimino wrote:
As Arseniy said you should fork before starting the scheduler. Forking once the scheduler is started and using async in the child is not supported.

Would forking be safe if I call `Scheduler.reset_in_forked_process` first thing in the child process?

You still have to be careful about what you do after. After `Scheduler.reset_in_forked_process` if you don't reuse resources using async created in the parent and then call `Scheduler.go` again it should be fine. 

For instance in your example, even if you call `Scheduler.reset_in_forked_process` it is not safe to use the file descriptors returned by [Async.Std.Unix.socketpair] in the child.

In most cases when you want to use async in the child it is advised to call `Unix.fork` only at the beginning of the program before setting-up anything related to async. If you can't do it this way it is probably safer to spawn a new command and pass it whatever it needs.

 
For the second error you are seing I think it's because [Fd.close] calls [Unix.shutdown ~mode:SHUTDOWN_ALL] if the file descriptor is a socket.

Is there a way to avoid the shutdown call?

You could pass `~should_close_file_descriptor:false` to `Fd.close` and close the underlying unix fd using `Core.Std.Unix.close`. But given the previous remark you should create the sockets using `Core.Std.Unix.socketpair` and then wrap them with [Fd.create] after the fork.

--
Jeremie

Jeremie Dimino

unread,
Jan 30, 2015, 5:15:22 AM1/30/15
to ocaml...@googlegroups.com
On Thu, Jan 29, 2015 at 5:29 PM, Andre Nathan <andr...@gmail.com> wrote:
Lwt has an `Lwt_unix.fork` function which resets the threading state in the child process (empty the thread queue and zero counters). I believe it should be possible in Async too.

`Lwt_unix.fork` is a bit misleading. The same restriction that apply to async still apply to lwt. Basically you must replace `Unix.fork` by `Lwt_unix.fork` if you intend to use lwt in the child, even if you call it at the beginning of your program. That's why this function exists.

But you still have to be very careful if you call `Lwt_unix.fork` at an arbitrary point in your program and intend to use lwt in the child. In general you shouldn't do it.

--
Jeremie

Andre Nathan

unread,
Feb 2, 2015, 8:57:34 AM2/2/15
to ocaml...@googlegroups.com
On Friday, January 30, 2015 at 8:10:33 AM UTC-2, Jeremie Dimino wrote:
If you can't do it this way it is probably safer to spawn a new command and pass it whatever it needs.

Ok, in the real code this is what I do, so I guess it'll be safe.
Reply all
Reply to author
Forward
0 new messages