wait/notify from a thread?

780 views
Skip to first unread message

Tim Holy

unread,
Jan 31, 2014, 7:58:41 AM1/31/14
to julia...@googlegroups.com
I'm interfacing with a C library that provides the ability to set a callback
function so you can be notified when some job is done. I'm hoping to exploit
this with the Task interface so I can feed multiple jobs with a single Julia
process. I'm declaring my notify function like this:

function callback_notify(hnd, status, data)
println("In notify")
s = unsafe_pointer_to_objref(data)::Stream
@show s
s.status = status
notify(s.c)
println("Done notifying")
nothing
end

Stream is a type that has a Condition member,
type Stream
handle::StreamHandle
status::Cuint
c::Condition
end

I've successfully set the callback function, issued wait(s.c), and from the
println statements I can see that the notify callback executes fully. However,
my task never wakes up. My suspicion is that the C library is spawning a new
thread; does that pose problems for the Task interface? Or is something else
happening? Any good workarounds?

--Tim

Spencer Russell

unread,
Jan 31, 2014, 1:50:53 PM1/31/14
to julia...@googlegroups.com
Hi Tim,

I spent some time playing around with interfacing to multi-threaded C libraries for AudioIO, namely portaudio. In Portaudio you register a callback which gets called periodically whenever the audio card needs a new frame of data. It gets called from a separate thread, so I was getting segfaults if I executed julia code directly from the callback.

In the end I wrote a small C module that implemented a callback which I kept synchronized with a Julia Task. For the C callback to wake up the Task I used a file descriptor (that way in Julia land I could wait on it and only block that task instead of the whole julia process). For the Julia Task to wake up the C callback I used a semaphore.

You can see my C code (as well as the BinDeps stuff to compile it) in the AudioIO repo at


I'm not sure if this approach would work in your situation, but if your C library is spawning a separate thread I think this is one way to handle it.

-s

Tim Holy

unread,
Jan 31, 2014, 5:05:20 PM1/31/14
to julia...@googlegroups.com
This is very helpful, thanks for the tips. I especially like the idea of
waiting on a file descriptor controlled by a C thread; that should avoid the
need for any Julia code to be called by the callback.

You may be interested in some experiments I ran (see below), trying to
reproduce my problem in a simpler context. "Unfortunately," all of these
experiments work. So either there is something specific to this particular
library, or "working" is fragile in some fashion and I just haven't succeeded
in triggering a failure.

--Tim

c = Condition()

function c_notify(p::Ptr{Void})
cond = unsafe_pointer_to_objref(p)
notify(cond)
C_NULL
end

const cb = cfunction(c_notify, Ptr{Void}, (Ptr{Void},))

@sync begin
@async begin
println("About to wait from 1")
wait(c)
println("Done waiting from 1")
end
@async begin
println("About to notify from 2")
# notify(c)
ccall(cb, Void, (Ptr{Void},), pointer_from_objref(c))
println("Done notifying from 2")
end
end

threads = Uint64[0]

@sync begin
@async begin
println("About to wait from 1")
wait(c)
println("Done waiting from 1")
end
@async begin
println("About to notify from 2")
# notify(c)
ccall(:pthread_create, Cint, (Ptr{Void}, Ptr{Void}, Ptr{Void},
Ptr{Void}), threads, C_NULL, cb, pointer_from_objref(c))
ccall(:pthread_join, Cint, (Uint64, Ptr{Void}), threads[1], C_NULL)
println("Done notifying from 2")
end
end

Jameson Nash

unread,
Jan 31, 2014, 7:58:11 PM1/31/14
to julia...@googlegroups.com
If you don't want to use pipes to synchronize your threads, the only
function that is safe to use from another thread (or signal handler)
is uv_async_send. Any other function will cause undefined behavior. It
may occasionally work, but it will probably also corrupt other random
parts of the program.

-Jameson

Tim Holy

unread,
Feb 1, 2014, 9:52:20 AM2/1/14
to julia...@googlegroups.com
Thanks Jameson, that worked great.

Since this took me into areas of Julia that I hadn't previously explored, I
thought I'd post a complete example below. I suspect that something along
these lines needs to be added to
http://docs.julialang.org/en/latest/manual/calling-c-and-fortran-
code/#passing-julia-callback-functions-to-c
Thoughts?

--Tim


# Writing thread-safe callback functions for interfacing with
# C libraries. As an example, this uses wait/notify on a condition.

c = Condition()

# Here's your function that you'd like to be the callback function.
# This will get executed indirectly, by scheduling it to run from the
# Julia event loop. It must have two inputs (which will have
# types SingleAsyncWork and Cint, respectively).
runnotify = (data, status) -> notify(c)

# Create an object that can be used to schedule a call of runnotify
# by Julia's event loop
notifyasync = Base.SingleAsyncWork(runnotify)

# Now create a C callback function that will issue uv_async_send, passing
# notifyasync.handle as its input.
# It should be set up to match the expected syntax of the C callback.
# Here we do it for the callback syntax of pthread_create.
async_send_pthread(func::Ptr{Void}) = (ccall(:uv_async_send,Cint,
(Ptr{Void},),func); C_NULL)
const c_async_send_pthread = cfunction(async_send_pthread, Ptr{Void},
(Ptr{Void},))

threads = Uint64[0]

@sync begin
@async begin
println("About to wait from 1")
wait(c)
println("Done waiting from 1")
end
@async begin
println("About to notify from 2")
# Perform the equivalent of notify(c) from a new thread
ccall(:pthread_create, Cint, (Ptr{Void}, Ptr{Void}, Ptr{Void},
Ptr{Void}),
threads, C_NULL, c_async_send_pthread, notifyasync.handle)
ccall(:pthread_join, Cint, (Uint64, Ptr{Void}), threads[1], C_NULL)
println("Done notifying from 2")
end
nothing
end

Spencer Russell

unread,
Feb 1, 2014, 5:39:13 PM2/1/14
to julia...@googlegroups.com
Ah, thanks for this.

For a bit I went down the uv_async_send path, but I had a hard time figuring out which Julia object's handle I should pass to it. I think I was trying to find some sort of handle on the actual waiting Task, instead of creating this intermediate SingleAsyncWork to do the waking.

-s

Stefan Karpinski

unread,
Apr 7, 2014, 1:48:37 PM4/7/14
to Julia Users
What are done and status here?

Stefan Karpinski

unread,
Apr 7, 2014, 1:49:39 PM4/7/14
to Julia Users
Sorry, "data" and "status".

Steven G. Johnson

unread,
Apr 7, 2014, 2:32:12 PM4/7/14
to julia...@googlegroups.com
"data" in this example will be notifyasync (i.e., the SingleAsyncWork object whose handle was sent to uv_async_send); might be nicer to call it "work".  No idea what "status" is.

Would be good to document this in the manual (it only says "you'll most likely just discard" the callback parameters).   In the case of ZMQ, I actually use the "data" parameter (https://github.com/JuliaLang/ZMQ.jl/blob/master/src/ZMQ.jl#L316-L334).

Jameson Nash

unread,
Apr 7, 2014, 9:25:11 PM4/7/14
to julia...@googlegroups.com
`status` used to equal 0 (success), now it is just a random number
(whatever happens to be in the register/stack location of this
argument that no longer exists in libuv, but never got remove from
Julia): https://github.com/joyent/libuv/blob/v0.8/include/uv.h#L291

the first argument to every libuv callback is the julia reference to
the object that caused the callback to trigger. In this case, as
Steven mentioned, this is a SingleAsyncWork object

possibly related: https://github.com/JuliaLang/julia/issues/1933

Stefan Karpinski

unread,
Apr 7, 2014, 10:30:54 PM4/7/14
to Julia Users
Thanks guys. Just to record it somewhere, I ended up doing this in a somewhat different but satisfying way – using ZMQ: https://gist.github.com/StefanKarpinski/10083996. Since the next step after getting two threads to communicate is inevitably to add a threadsafe queuing mechanism, you might as well cut to the chase and just use ZMQ right away. Especially since the ZMQ package already does all the heavy lifting to make this easy. Bonus: if you ever decide to move from a multiple thread model to a multiple process model – even a distributed one – you barely have to change your code.
Reply all
Reply to author
Forward
0 new messages