Periodic events in KJ concurrency framework

224 views
Skip to first unread message

Remy Blank

unread,
Mar 14, 2014, 6:49:45 PM3/14/14
to capn...@googlegroups.com
What would be the recommended way to generate periodic events in the KJ
concurrency framework? The use case is that I want to periodically send
an RPC with a few metrics like CPU and memory usage, network traffic,
etc. to a central server for monitoring purposes.

AFAICT, there's no functionality in the event loop or event port that
would allow doing this directly.

Is it possible to fulfill a promise from a different thread than the one
where the result will be handled? This would allow having a separate
thread acting like a clock and triggering promises at specific times.
But my understanding is that this isn't possible.

The doc mentions:

As of version 0.4, the only supported way to communicate between
threads is over pipes or socketpairs. This will be improved in future
versions. For now, just set up an RPC connection over that socketpair.

But this still doesn't solve the problem, as that other thread will also
be using Cap'n Proto RPC and therefore the same KJ concurrency
framework, which can't generate periodic events. Even if I used a
separate process, the same limitation would apply.

Any ideas?

-- Remy

signature.asc

Kenton Varda

unread,
Mar 14, 2014, 6:57:56 PM3/14/14
to Remy Blank, capnproto
Timers are a major missing feature right now.

If you wanted to hack it using a thread, have the thread periodically write a byte to a pipe, and in your main thread wait on the read end of the pipe.  In fact, AsyncIoProvider has a method newPipeThread() which should make this easy to set up.

In this other thread, you can feel free to use wait() and sleep(), as the thread will not need to be actively listening for any kind of event.

Alternatively if you want to go through and add a timer queue to UnixEventPort, that would be cool.  :)

-Kenton

Kenton Varda

unread,
Mar 14, 2014, 7:03:37 PM3/14/14
to Remy Blank, capnproto
BTW, in case it was't totally clear, simply fulfilling a promise from another thread is not safe.  (It used to be, but that proved too costly in terms of code complexity and synchronization overhead.)  So, you do have to use a pipe, or maybe an eventfd.

Actually, that reminds me!  If you are Linux-bound, you can use a timerfd.

Remy Blank

unread,
Mar 14, 2014, 7:19:30 PM3/14/14
to capn...@googlegroups.com
Kenton Varda wrote:
> If you wanted to hack it using a thread, have the thread periodically
> write a byte to a pipe, and in your main thread wait on the read end of
> the pipe. In fact, AsyncIoProvider has a method newPipeThread() which
> should make this easy to set up.
>
> In this other thread, you can feel free to use wait() and sleep(), as
> the thread will not need to be actively listening for any kind of event.

Right, I hadn't thought of that. Thanks for the suggestion.

> Alternatively if you want to go through and add a timer queue to
> UnixEventPort, that would be cool. :)

Sure, why not. I have found the place where poll() is called, so that's
probably where I should plug in a list (or a heap) of timers and set the
timeout to the time until the next timer expires. Where I'm not so sure
is how to pipe the timer functionality through to higher levels.

I suppose it would make sense to expose the timer functionality in
EventLoop, something like:

typedef int64_t Time; // Microseconds since the epoch
Time now() const;
Promise<void> setTimer(Time time);

I see that EventLoop is owned by the LowLevelAsyncIoProviderImpl, but
isn't made accessible. And AsyncIoContext only has a WaitScope. Should
it have a reference to EventLoop instead (and users can create their own
WaitScope on the stack)?

It would be great if you could give me a few hints about how you imagine
this should be implemented, so that I don't go off too far in the wrong
direction.

-- Remy

signature.asc

Remy Blank

unread,
Mar 14, 2014, 7:31:44 PM3/14/14
to capn...@googlegroups.com
Kenton Varda wrote:
> Actually, that reminds me! If you are Linux-bound, you can use a timerfd.
>
> http://man7.org/linux/man-pages/man2/timerfd_create.2.html

Ooh, nice! That still leaves the question of how to get access to the
UnixEventPort so that I can use onFdEvent. But if I have to modify Cap'n
Proto code anyway, I might as well implement timers properly.

-- Remy

signature.asc

Kenton Varda

unread,
Mar 14, 2014, 7:38:13 PM3/14/14
to Remy Blank, capnproto
I actually don't think EventLoop should change.  Time is I/O, so this belongs on AsyncIoProvider.  This also makes it easy to write tests that mock the timer, since AsyncIoProvider is an abstract interface.

LowLevelAsyncIoProvider will need a method as well, of course.

We probably need two methods:

  Promise<void> atTimeSinceEpoch(Time timeSinceEpoch);
  Promise<void> atTimeFromNow(Time timeFromNow);

This would also be a good time to make sure we define the "Time" type in a way that avoids unit confusion.  Check out kj/units.h and the way it is used in capnp/common.h.  We can define SECONDS, MSEC, and USEC as different units (like capnp/common.h defines BITS, BYTES, WORDS), and then create a Time class that can be implicitly converted to and from each of these types.  All this would go in a kj/time.h.

So then people will be able to write things like:

  provider->atTimeFromNow(5 * kj::SECONDS);

Thoughts?

-Kenton

Kenton Varda

unread,
Mar 14, 2014, 7:39:19 PM3/14/14
to Remy Blank, capnproto
You should be able to simply wrap the FD in an AsyncIoStream using LowLevelAsyncIoProvider, since events are received by regular old read().  So I don't think you need to add anything to KJ to use this path.

-Kenton

Remy Blank

unread,
Mar 14, 2014, 7:54:12 PM3/14/14
to capn...@googlegroups.com
Kenton Varda wrote:
> I actually don't think EventLoop should change. Time is I/O, so this
> belongs on AsyncIoProvider. This also makes it easy to write tests that
> mock the timer, since AsyncIoProvider is an abstract interface.
>
> LowLevelAsyncIoProvider will need a method as well, of course.
>
> We probably need two methods:
>
> Promise<void> atTimeSinceEpoch(Time timeSinceEpoch);
> Promise<void> atTimeFromNow(Time timeFromNow);
>
> This would also be a good time to make sure we define the "Time" type in
> a way that avoids unit confusion. Check out kj/units.h and the way it
> is used in capnp/common.h. We can define SECONDS, MSEC, and USEC as
> different units (like capnp/common.h defines BITS, BYTES, WORDS), and
> then create a Time class that can be implicitly converted to and from
> each of these types. All this would go in a kj/time.h.
>
> So then people will be able to write things like:
>
> provider->atTimeFromNow(5 * kj::SECONDS);
>
> Thoughts?

Sounds reasonable. The unit stuff looks somewhat on the heavy side, but
why not.

-- Remy

signature.asc

Kenton Varda

unread,
Mar 14, 2014, 8:06:15 PM3/14/14
to Remy Blank, capnproto
On Fri, Mar 14, 2014 at 4:54 PM, Remy Blank <remy....@pobox.com> wrote:
Sounds reasonable. The unit stuff looks somewhat on the heavy side, but
why not.

Yeah, but all the weight is strictly at compile time.

Actually, now that I think of it, you can probably just typedef Time to be a Quantity in microseconds, and then define the constants SECONDS and MSEC to be multiples of USEC, rather than do the more-complicated stuff I did with WordCount/ByteCount/etc. in capnp/common.h.

-Kenton

Alex Elsayed

unread,
Mar 31, 2014, 3:00:03 PM3/31/14
to capn...@googlegroups.com
Sorry for coming in late, but I felt it worth noting that Linux at least has
been consistently moving to prefer/recommend nsec resolution - timespec
instead of timeval, clock_gettime instead of gettimeofday, etc.

timerfd operates on timespec, in fact.


Reply all
Reply to author
Forward
0 new messages