[go] push by rsc@golang.org - runtime: add timer support, use for package time... on 2011-11-09 20:17 GMT

18 views
Skip to first unread message

g...@googlecode.com

unread,
Nov 9, 2011, 3:17:47 PM11/9/11
to golang-...@googlegroups.com
Revision: 784b29af787e
Author: Russ Cox <r...@golang.org>
Date: Wed Nov 9 12:17:05 2011
Log: runtime: add timer support, use for package time

This looks like it is just moving some code from
time to runtime (and translating it to C), but the
runtime can do a better job managing the goroutines,
and it needs this functionality for its own maintenance
(for example, for the garbage collector to hand back
unused memory to the OS on a time delay).
Might as well have just one copy of the timer logic,
and runtime can't depend on time, so vice versa.

It also unifies Sleep, NewTicker, and NewTimer behind
one mechanism, so that there are no claims that one
is more efficient than another. (For example, today
people recommend using time.After instead of time.Sleep
to avoid blocking an OS thread.)

Fixes issue 1644.
Fixes issue 1731.
Fixes issue 2190.

R=golang-dev, r, hectorchu, iant, iant, jsing, alex.brainman, dvyukov
CC=golang-dev
http://codereview.appspot.com/5334051
http://code.google.com/p/go/source/detail?r=784b29af787e

Modified:
/src/pkg/runtime/darwin/os.h
/src/pkg/runtime/darwin/thread.c
/src/pkg/runtime/freebsd/thread.c
/src/pkg/runtime/linux/thread.c
/src/pkg/runtime/lock_futex.c
/src/pkg/runtime/lock_sema.c
/src/pkg/runtime/openbsd/thread.c
/src/pkg/runtime/plan9/thread.c
/src/pkg/runtime/proc.c
/src/pkg/runtime/runtime.h
/src/pkg/runtime/time.goc
/src/pkg/runtime/windows/thread.c
/src/pkg/time/sleep.go
/src/pkg/time/sys.go
/src/pkg/time/tick.go

=======================================
--- /src/pkg/runtime/darwin/os.h Fri Sep 30 06:40:01 2011
+++ /src/pkg/runtime/darwin/os.h Wed Nov 9 12:17:05 2011
@@ -9,7 +9,7 @@
void runtime·bsdthread_register(void);
int32 runtime·mach_msg_trap(MachHeader*, int32, uint32, uint32, uint32,
uint32, uint32);
uint32 runtime·mach_reply_port(void);
-void runtime·mach_semacquire(uint32);
+int32 runtime·mach_semacquire(uint32, int64);
uint32 runtime·mach_semcreate(void);
void runtime·mach_semdestroy(uint32);
void runtime·mach_semrelease(uint32);
=======================================
--- /src/pkg/runtime/darwin/thread.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/darwin/thread.c Wed Nov 9 12:17:05 2011
@@ -17,10 +17,10 @@
*(int32*)1231 = 1231;
}

-void
-runtime·semasleep(void)
-{
- runtime·mach_semacquire(m->waitsema);
+int32
+runtime·semasleep(int64 ns)
+{
+ return runtime·mach_semacquire(m->waitsema, ns);
}

void
@@ -252,6 +252,7 @@
// Mach calls that get interrupted by Unix signals
// return this error code. We retry them.
KERN_ABORTED = 14,
+ KERN_OPERATION_TIMED_OUT = 49,
};

typedef struct Tmach_semcreateMsg Tmach_semcreateMsg;
@@ -343,16 +344,25 @@
int32 runtime·mach_semaphore_signal(uint32 sema);
int32 runtime·mach_semaphore_signal_all(uint32 sema);

-void
-runtime·mach_semacquire(uint32 sem)
+int32
+runtime·mach_semacquire(uint32 sem, int64 ns)
{
int32 r;

+ if(ns >= 0) {
+ r = runtime·mach_semaphore_timedwait(sem, ns/1000000000LL,
ns%1000000000LL);
+ if(r == KERN_ABORTED || r == KERN_OPERATION_TIMED_OUT)
+ return -1;
+ if(r != 0)
+ macherror(r, "semaphore_wait");
+ return 0;
+ }
while((r = runtime·mach_semaphore_wait(sem)) != 0) {
if(r == KERN_ABORTED) // interrupted
continue;
macherror(r, "semaphore_wait");
}
+ return 0;
}

void
=======================================
--- /src/pkg/runtime/freebsd/thread.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/freebsd/thread.c Wed Nov 9 12:17:05 2011
@@ -10,14 +10,23 @@
extern int32 runtime·sys_umtx_op(uint32*, int32, uint32, void*, void*);

// FreeBSD's umtx_op syscall is effectively the same as Linux's futex, and
-// thus the code is largely similar. See linux/thread.c for comments.
+// thus the code is largely similar. See linux/thread.c and lock_futex.c
for comments.

void
-runtime·futexsleep(uint32 *addr, uint32 val)
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
{
int32 ret;
-
- ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, nil);
+ Timespec ts, *tsp;
+
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.sec = ns / 1000000000LL;
+ ts.nsec = ns % 1000000000LL;
+ tsp = &ts;
+ }
+
+ ret = runtime·sys_umtx_op(addr, UMTX_OP_WAIT, val, nil, tsp);
if(ret >= 0 || ret == -EINTR)
return;

=======================================
--- /src/pkg/runtime/linux/thread.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/linux/thread.c Wed Nov 9 12:17:05 2011
@@ -34,15 +34,29 @@
// Atomically,
// if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
void
-runtime·futexsleep(uint32 *addr, uint32 val)
-{
+runtime·futexsleep(uint32 *addr, uint32 val, int64 ns)
+{
+ Timespec ts, *tsp;
+
+ if(ns < 0)
+ tsp = nil;
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ // Avoid overflow
+ if(ts.tv_sec > 1<<30)
+ ts.tv_sec = 1<<30;
+ tsp = &ts;
+ }
+
// Some Linux kernels have a bug where futex of
// FUTEX_WAIT returns an internal error code
// as an errno. Libpthread ignores the return value
// here, and so can we: as it says a few lines up,
// spurious wakeups are allowed.
- runtime·futex(addr, FUTEX_WAIT, val, nil, nil, 0);
+ runtime·futex(addr, FUTEX_WAIT, val, tsp, nil, 0);
}

// If any procs are sleeping on addr, wake up at most cnt.
=======================================
--- /src/pkg/runtime/lock_futex.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/lock_futex.c Wed Nov 9 12:17:05 2011
@@ -4,25 +4,28 @@

#include "runtime.h"

+// This implementation depends on OS-specific implementations of
+//
+// runtime.futexsleep(uint32 *addr, uint32 val, int64 ns)
+// Atomically,
+// if(*addr == val) sleep
+// Might be woken up spuriously; that's allowed.
+// Don't sleep longer than ns; ns < 0 means forever.
+//
+// runtime.futexwakeup(uint32 *addr, uint32 cnt)
+// If any procs are sleeping on addr, wake up at most cnt.
+
enum
{
MUTEX_UNLOCKED = 0,
MUTEX_LOCKED = 1,
MUTEX_SLEEPING = 2,
-
+
ACTIVE_SPIN = 4,
ACTIVE_SPIN_CNT = 30,
PASSIVE_SPIN = 1,
};

-// Atomically,
-// if(*addr == val) sleep
-// Might be woken up spuriously; that's allowed.
-void runtime·futexsleep(uint32 *addr, uint32 val);
-
-// If any procs are sleeping on addr, wake up at most cnt.
-void runtime·futexwakeup(uint32 *addr, uint32 cnt);
-
// Possible lock states are MUTEX_UNLOCKED, MUTEX_LOCKED and
MUTEX_SLEEPING.
// MUTEX_SLEEPING means that there is presumably at least one sleeping
thread.
// Note that there can be spinning threads during all states - they do not
@@ -39,7 +42,7 @@
v = runtime·xchg(&l->key, MUTEX_LOCKED);
if(v == MUTEX_UNLOCKED)
return;
-
+
// wait is either MUTEX_LOCKED or MUTEX_SLEEPING
// depending on whether there is a thread sleeping
// on this mutex. If we ever change l->key from
@@ -48,13 +51,13 @@
// returning, to ensure that the sleeping thread gets
// its wakeup call.
wait = v;
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(;;) {
// Try for lock, spinning.
for(i = 0; i < spin; i++) {
@@ -63,7 +66,7 @@
return;
runtime·procyield(ACTIVE_SPIN_CNT);
}
-
+
// Try for lock, rescheduling.
for(i=0; i < PASSIVE_SPIN; i++) {
while(l->key == MUTEX_UNLOCKED)
@@ -71,13 +74,13 @@
return;
runtime·osyield();
}
-
+
// Sleep.
v = runtime·xchg(&l->key, MUTEX_SLEEPING);
if(v == MUTEX_UNLOCKED)
return;
wait = MUTEX_SLEEPING;
- runtime·futexsleep(&l->key, MUTEX_SLEEPING);
+ runtime·futexsleep(&l->key, MUTEX_SLEEPING, -1);
}
}

@@ -114,5 +117,30 @@
runtime·notesleep(Note *n)
{
while(runtime·atomicload(&n->key) == 0)
- runtime·futexsleep(&n->key, 0);
-}
+ runtime·futexsleep(&n->key, 0, -1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ runtime·futexsleep(&n->key, 0, ns);
+ if(runtime·atomicload(&n->key) != 0)
+ return;
+ now = runtime·nanotime();
+ if(now >= deadline)
+ return;
+ ns = deadline - now;
+ }
+}
=======================================
--- /src/pkg/runtime/lock_sema.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/lock_sema.c Wed Nov 9 12:17:05 2011
@@ -4,6 +4,22 @@

#include "runtime.h"

+// This implementation depends on OS-specific implementations of
+//
+// uintptr runtime.semacreate(void)
+// Create a semaphore, which will be assigned to m->waitsema.
+// The zero value is treated as absence of any semaphore,
+// so be sure to return a non-zero value.
+//
+// int32 runtime.semasleep(int64 ns)
+// If ns < 0, acquire m->waitsema and return 0.
+// If ns >= 0, try to acquire m->waitsema for at most ns nanoseconds.
+// Return 0 if the semaphore was acquired, -1 if interrupted or timed out.
+//
+// int32 runtime.semawakeup(M *mp)
+// Wake up mp, which is or will soon be sleeping on mp->waitsema.
+//
+
enum
{
LOCKED = 1,
@@ -13,13 +29,6 @@
PASSIVE_SPIN = 1,
};

-// creates per-M semaphore (must not return 0)
-uintptr runtime·semacreate(void);
-// acquires per-M semaphore
-void runtime·semasleep(void);
-// releases mp's per-M semaphore
-void runtime·semawakeup(M *mp);
-
void
runtime·lock(Lock *l)
{
@@ -35,13 +44,13 @@

if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
-
+
// On uniprocessor's, no point spinning.
// On multiprocessors, spin for ACTIVE_SPIN attempts.
spin = 0;
if(runtime·ncpu > 1)
spin = ACTIVE_SPIN;
-
+
for(i=0;; i++) {
v = (uintptr)runtime·atomicloadp(&l->waitm);
if((v&LOCKED) == 0) {
@@ -68,11 +77,11 @@
goto unlocked;
}
if(v&LOCKED) {
- // Wait.
- runtime·semasleep();
+ // Queued. Wait.
+ runtime·semasleep(-1);
i = 0;
}
- }
+ }
}
}

@@ -95,7 +104,7 @@
// Dequeue an M.
mp = (void*)(v&~LOCKED);
if(runtime·casp(&l->waitm, (void*)v, mp->nextwaitm)) {
- // Wake that M.
+ // Dequeued an M. Wake it.
runtime·semawakeup(mp);
break;
}
@@ -113,9 +122,23 @@
void
runtime·notewakeup(Note *n)
{
- if(runtime·casp(&n->waitm, nil, (void*)LOCKED))
- return;
- runtime·semawakeup(n->waitm);
+ M *mp;
+
+ do
+ mp = runtime·atomicloadp(&n->waitm);
+ while(!runtime·casp(&n->waitm, mp, (void*)LOCKED));
+
+ // Successfully set waitm to LOCKED.
+ // What was it before?
+ if(mp == nil) {
+ // Nothing was waiting. Done.
+ } else if(mp == (M*)LOCKED) {
+ // Two notewakeups! Not allowed.
+ runtime·throw("notewakeup - double wakeup");
+ } else {
+ // Must be the waiting m. Wake it up.
+ runtime·semawakeup(mp);
+ }
}

void
@@ -123,6 +146,72 @@
{
if(m->waitsema == 0)
m->waitsema = runtime·semacreate();
- if(runtime·casp(&n->waitm, nil, m))
- runtime·semasleep();
-}
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notesleep - waitm out of sync");
+ return;
+ }
+ // Queued. Sleep.
+ runtime·semasleep(-1);
+}
+
+void
+runtime·notetsleep(Note *n, int64 ns)
+{
+ M *mp;
+ int64 deadline, now;
+
+ if(ns < 0) {
+ runtime·notesleep(n);
+ return;
+ }
+
+ if(m->waitsema == 0)
+ m->waitsema = runtime·semacreate();
+
+ // Register for wakeup on n->waitm.
+ if(!runtime·casp(&n->waitm, nil, m)) { // must be LOCKED (got wakeup
already)
+ if(n->waitm != (void*)LOCKED)
+ runtime·throw("notetsleep - waitm out of sync");
+ return;
+ }
+
+ deadline = runtime·nanotime() + ns;
+ for(;;) {
+ // Registered. Sleep.
+ if(runtime·semasleep(ns) >= 0) {
+ // Acquired semaphore, semawakeup unregistered us.
+ // Done.
+ return;
+ }
+
+ // Interrupted or timed out. Still registered. Semaphore not acquired.
+ now = runtime·nanotime();
+ if(now >= deadline)
+ break;
+
+ // Deadline hasn't arrived. Keep sleeping.
+ ns = deadline - now;
+ }
+
+ // Deadline arrived. Still registered. Semaphore not acquired.
+ // Want to give up and return, but have to unregister first,
+ // so that any notewakeup racing with the return does not
+ // try to grant us the semaphore when we don't expect it.
+ for(;;) {
+ mp = runtime·atomicloadp(&n->waitm);
+ if(mp == m) {
+ // No wakeup yet; unregister if possible.
+ if(runtime·casp(&n->waitm, mp, nil))
+ return;
+ } else if(mp == (M*)LOCKED) {
+ // Wakeup happened so semaphore is available.
+ // Grab it to avoid getting out of sync.
+ if(runtime·semasleep(-1) < 0)
+ runtime·throw("runtime: unable to acquire - semaphore out of sync");
+ return;
+ } else {
+ runtime·throw("runtime: unexpected waitm - semaphore out of sync");
+ }
+ }
+}
=======================================
--- /src/pkg/runtime/openbsd/thread.c Mon Nov 7 08:57:34 2011
+++ /src/pkg/runtime/openbsd/thread.c Wed Nov 9 12:17:05 2011
@@ -62,21 +62,52 @@
return 1;
}

-void
-runtime·semasleep(void)
-{
-retry:
+int32
+runtime·semasleep(int64 ns)
+{
+ Timespec ts;
+
// spin-mutex lock
while(runtime·xchg(&m->waitsemalock, 1))
runtime·osyield();
- if(m->waitsemacount == 0) {
- // the function unlocks the spinlock
- runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
- goto retry;
- }
- m->waitsemacount--;
+
+ for(;;) {
+ // lock held
+ if(m->waitsemacount == 0) {
+ // sleep until semaphore != 0 or timeout.
+ // thrsleep unlocks m->waitsemalock.
+ if(ns < 0)
+ runtime·thrsleep(&m->waitsemacount, 0, nil, &m->waitsemalock);
+ else {
+ ts.tv_sec = ns/1000000000LL;
+ ts.tv_nsec = ns%1000000000LL;
+ runtime·thrsleep(&m->waitsemacount, CLOCK_REALTIME, &ts,
&m->waitsemalock);
+ }
+ // reacquire lock
+ while(runtime·xchg(&m->waitsemalock, 1))
+ runtime·osyield();
+ }
+
+ // lock held (again)
+ if(m->waitsemacount != 0) {
+ // semaphore is available.
+ m->waitsemacount--;
+ // spin-mutex unlock
+ runtime·atomicstore(&m->waitsemalock, 0);
+ return 0; // semaphore acquired
+ }
+
+ // semaphore not available.
+ // if there is a timeout, stop now.
+ // otherwise keep trying.
+ if(ns >= 0)
+ break;
+ }
+
+ // lock held but giving up
// spin-mutex unlock
runtime·atomicstore(&m->waitsemalock, 0);
+ return -1;
}

void
=======================================
--- /src/pkg/runtime/plan9/thread.c Wed Nov 2 06:42:01 2011
+++ /src/pkg/runtime/plan9/thread.c Wed Nov 9 12:17:05 2011
@@ -78,7 +78,7 @@
uint8 tmp[16];
uint8 *p, *q;
int32 pid;
-
+
runtime·memclr(buf, sizeof buf);
runtime·memclr(tmp, sizeof tmp);
pid = _tos->pid;
@@ -94,7 +94,7 @@
for(q--; q >= tmp;)
*p++ = *q--;
runtime·memmove((void*)p, (void*)"/notepg", 7);
-
+
/* post interrupt note */
fd = runtime·open(buf, OWRITE);
runtime·write(fd, "interrupt", 9);
@@ -108,8 +108,8 @@
if(0){
runtime·printf("newosproc stk=%p m=%p g=%p fn=%p rfork=%p id=%d/%d
ostk=%p\n",
stk, m, g, fn, runtime·rfork, m->id, m->tls[0], &m);
- }
-
+ }
+
if(runtime·rfork(RFPROC|RFMEM|RFNOWAIT, stk, m, g, fn) < 0)
runtime·throw("newosproc: rfork failed");
}
@@ -120,12 +120,45 @@
return 1;
}

-void
-runtime·semasleep(void)
-{
+int32
+runtime·semasleep(int64 ns)
+{
+ int32 ret;
+ int32 ms;
+
+ if(ns >= 0) {
+ // TODO: Plan 9 needs a new system call, tsemacquire.
+ // The kernel implementation is the same as semacquire
+ // except with a tsleep and check for timeout.
+ // It would be great if the implementation returned the
+ // value that was added to the semaphore, so that on
+ // timeout the return value would be 0, on success 1.
+ // Then the error string does not have to be parsed
+ // to detect timeout.
+ //
+ // If a negative time indicates no timeout, then
+ // semacquire can be implemented (in the kernel)
+ // as tsemacquire(p, v, -1).
+ runtime·throw("semasleep: timed sleep not implemented on Plan 9");
+
+ /*
+ if(ns < 0)
+ ms = -1;
+ else if(ns/1000 > 0x7fffffffll)
+ ms = 0x7fffffff;
+ else
+ ms = ns/1000;
+ ret = runtime·plan9_tsemacquire(&m->waitsemacount, 1, ms);
+ if(ret == 1)
+ return 0; // success
+ return -1; // timeout or interrupted
+ */
+ }
+
while(runtime·plan9_semacquire(&m->waitsemacount, 1) < 0) {
/* interrupted; try again */
}
+ return 0; // success
}

void
=======================================
--- /src/pkg/runtime/proc.c Tue Nov 8 18:16:25 2011
+++ /src/pkg/runtime/proc.c Wed Nov 9 12:17:05 2011
@@ -15,7 +15,6 @@
static void schedule(G*);
static void acquireproc(void);
static void releaseproc(void);
-static M *startm(void);

typedef struct Sched Sched;

@@ -72,7 +71,7 @@
volatile uint32 atomic; // atomic scheduling word (see below)

int32 profilehz; // cpu profiling rate
-
+
bool init; // running initialization
bool lockmain; // init called runtime.LockOSThread

@@ -701,7 +700,7 @@
// but m is not running a specific goroutine,
// so set the helpgc flag as a signal to m's
// first schedule(nil) to mcpu-- and grunning--.
- m = startm();
+ m = runtime·newm();
m->helpgc = 1;
runtime·sched.grunning++;
}
@@ -756,14 +755,14 @@

// Find the m that will run gp.
if((mp = mget(gp)) == nil)
- mp = startm();
+ mp = runtime·newm();
mnextg(mp, gp);
}
}

// Create a new m. It will start off with a call to runtime·mstart.
-static M*
-startm(void)
+M*
+runtime·newm(void)
{
M *m;

=======================================
--- /src/pkg/runtime/runtime.h Thu Nov 3 14:35:28 2011
+++ /src/pkg/runtime/runtime.h Wed Nov 9 12:17:05 2011
@@ -70,6 +70,8 @@
typedef struct Complex64 Complex64;
typedef struct Complex128 Complex128;
typedef struct WinCall WinCall;
+typedef struct Timers Timers;
+typedef struct Timer Timer;

/*
* per-cpu declaration.
@@ -239,7 +241,7 @@
uintptr waitsema; // semaphore for parking on locks
uint32 waitsemacount;
uint32 waitsemalock;
-
+
#ifdef __WINDOWS__
void* thread; // thread handle
#endif
@@ -315,6 +317,33 @@
};
#endif

+struct Timers
+{
+ Lock;
+ G *timerproc;
+ bool sleeping;
+ bool rescheduling;
+ Note waitnote;
+ Timer **t;
+ int32 len;
+ int32 cap;
+};
+
+// Package time knows the layout of this structure.
+// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
+struct Timer
+{
+ int32 i; // heap index
+
+ // Timer wakes up at when, and then at when+period, ... (period > 0 only)
+ // each time calling f(now, arg) in the timer goroutine, so f must be
+ // a well-behaved function and not block.
+ int64 when;
+ int64 period;
+ void (*f)(int64, Eface);
+ Eface arg;
+};
+
/*
* defined macros
* you need super-gopher-guru privilege
@@ -483,6 +512,8 @@
void runtime·exit(int32);
void runtime·breakpoint(void);
void runtime·gosched(void);
+void runtime·tsleep(int64);
+M* runtime·newm(void);
void runtime·goexit(void);
void runtime·asmcgocall(void (*fn)(void*), void*);
void runtime·entersyscall(void);
@@ -540,10 +571,28 @@
* subsequent noteclear must be called only after
* previous notesleep has returned, e.g. it's disallowed
* to call noteclear straight after notewakeup.
+ *
+ * notetsleep is like notesleep but wakes up after
+ * a given number of nanoseconds even if the event
+ * has not yet happened. if a goroutine uses notetsleep to
+ * wake up early, it must wait to call noteclear until it
+ * can be sure that no other goroutine is calling
+ * notewakeup.
*/
void runtime·noteclear(Note*);
void runtime·notesleep(Note*);
void runtime·notewakeup(Note*);
+void runtime·notetsleep(Note*, int64);
+
+/*
+ * low-level synchronization for implementing the above
+ */
+uintptr runtime·semacreate(void);
+int32 runtime·semasleep(int64);
+void runtime·semawakeup(M*);
+// or
+void runtime·futexsleep(uint32*, uint32, int64);
+void runtime·futexwakeup(uint32*, uint32);

/*
* This is consistent across Linux and BSD.
=======================================
--- /src/pkg/runtime/time.goc Thu Nov 3 14:35:28 2011
+++ /src/pkg/runtime/time.goc Wed Nov 9 12:17:05 2011
@@ -2,12 +2,242 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

-// Runtime implementations to help package time.
+// Time-related runtime and pieces of package time.

package time

#include "runtime.h"
-
+#include "defs.h"
+#include "os.h"
+#include "arch.h"
+#include "malloc.h"
+
+static Timers timers;
+static void addtimer(Timer*);
+static bool deltimer(Timer*);
+
+// Package time APIs.
+// Godoc uses the comments in package time, not these.
+
+// Nanoseconds returns the current time in nanoseconds.
func Nanoseconds() (ret int64) {
ret = runtime·nanotime();
}
+
+// Sleep puts the current goroutine to sleep for at least ns nanoseconds.
+func Sleep(ns int64) {
+ g->status = Gwaiting;
+ g->waitreason = "sleep";
+ runtime·tsleep(ns);
+}
+
+// startTimer adds t to the timer heap.
+func startTimer(t *Timer) {
+ addtimer(t);
+}
+
+// stopTimer removes t from the timer heap if it is there.
+// It returns true if t was removed, false if t wasn't even there.
+func stopTimer(t *Timer) (stopped bool) {
+ stopped = deltimer(t);
+}
+
+// C runtime.
+
+static void timerproc(void);
+static void siftup(int32);
+static void siftdown(int32);
+
+// Ready the goroutine e.data.
+static void
+ready(int64 now, Eface e)
+{
+ USED(now);
+
+ runtime·ready(e.data);
+}
+
+// Put the current goroutine to sleep for ns nanoseconds.
+// The caller must have set g->status and g->waitreason.
+void
+runtime·tsleep(int64 ns)
+{
+ Timer t;
+
+ if(ns <= 0)
+ return;
+
+ t.when = runtime·nanotime() + ns;
+ t.period = 0;
+ t.f = ready;
+ t.arg.data = g;
+ addtimer(&t);
+ runtime·gosched();
+}
+
+// Add a timer to the heap and start or kick the timer proc
+// if the new timer is earlier than any of the others.
+static void
+addtimer(Timer *t)
+{
+ int32 n;
+ Timer **nt;
+
+ runtime·lock(&timers);
+ if(timers.len >= timers.cap) {
+ // Grow slice.
+ n = 16;
+ if(n <= timers.cap)
+ n = timers.cap*3 / 2;
+ nt = runtime·malloc(n*sizeof nt[0]);
+ runtime·memmove(nt, timers.t, timers.len*sizeof nt[0]);
+ runtime·free(timers.t);
+ timers.t = nt;
+ timers.cap = n;
+ }
+ t->i = timers.len++;
+ timers.t[t->i] = t;
+ siftup(t->i);
+ if(t->i == 0) {
+ // siftup moved to top: new earliest deadline.
+ if(timers.sleeping) {
+ timers.sleeping = false;
+ runtime·notewakeup(&timers.waitnote);
+ }
+ if(timers.rescheduling) {
+ timers.rescheduling = false;
+ runtime·ready(timers.timerproc);
+ }
+ }
+ if(timers.timerproc == nil)
+ timers.timerproc = runtime·newproc1((byte*)timerproc, nil, 0, 0,
addtimer);
+ runtime·unlock(&timers);
+}
+
+// Delete timer t from the heap.
+// Do not need to update the timerproc:
+// if it wakes up early, no big deal.
+static bool
+deltimer(Timer *t)
+{
+ int32 i;
+
+ runtime·lock(&timers);
+
+ // t may not be registered anymore and may have
+ // a bogus i (typically 0, if generated by Go).
+ // Verify it before proceeding.
+ i = t->i;
+ if(i < 0 || i >= timers.len || timers.t[i] != t) {
+ runtime·unlock(&timers);
+ return false;
+ }
+
+ timers.t[i] = timers.t[--timers.len];
+ siftup(i);
+ siftdown(i);
+ runtime·unlock(&timers);
+ return true;
+}
+
+// Timerproc runs the time-driven events.
+// It sleeps until the next event in the timers heap.
+// If addtimer inserts a new earlier event, addtimer
+// wakes timerproc early.
+static void
+timerproc(void)
+{
+ int64 delta, now;
+ Timer *t;
+
+ for(;;) {
+ runtime·lock(&timers);
+ now = runtime·nanotime();
+ for(;;) {
+ if(timers.len == 0) {
+ delta = -1;
+ break;
+ }
+ t = timers.t[0];
+ delta = t->when - now;
+ if(delta > 0)
+ break;
+ if(t->period > 0) {
+ // leave in heap but adjust next time to fire
+ t->when += t->period * (1 + -delta/t->period);
+ siftdown(0);
+ } else {
+ // remove from heap
+ timers.t[0] = timers.t[--timers.len];
+ timers.t[0]->i = 0;
+ siftdown(0);
+ t->i = -1; // mark as removed
+ }
+ t->f(now, t->arg);
+ }
+ if(delta < 0) {
+ // No timers left - put goroutine to sleep.
+ timers.rescheduling = true;
+ g->status = Gwaiting;
+ g->waitreason = "timer goroutine (idle)";
+ runtime·unlock(&timers);
+ runtime·gosched();
+ continue;
+ }
+ // At least one timer pending. Sleep until then.
+ timers.sleeping = true;
+ runtime·noteclear(&timers.waitnote);
+ runtime·unlock(&timers);
+ runtime·entersyscall();
+ runtime·notetsleep(&timers.waitnote, delta);
+ runtime·exitsyscall();
+ }
+}
+
+// heap maintenance algorithms.
+
+static void
+siftup(int32 i)
+{
+ int32 p;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ while(i > 0) {
+ p = (i-1)/2; // parent
+ if(t[i]->when >= t[p]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[p];
+ t[p] = tmp;
+ t[i]->i = i;
+ t[p]->i = p;
+ i = p;
+ }
+}
+
+static void
+siftdown(int32 i)
+{
+ int32 c, len;
+ Timer **t, *tmp;
+
+ t = timers.t;
+ len = timers.len;
+ for(;;) {
+ c = i*2 + 1; // left child
+ if(c >= len) {
+ break;
+ }
+ if(c+1 < len && t[c+1]->when < t[c]->when)
+ c++;
+ if(t[c]->when >= t[i]->when)
+ break;
+ tmp = t[i];
+ t[i] = t[c];
+ t[c] = tmp;
+ t[i]->i = i;
+ t[c]->i = c;
+ i = c;
+ }
+}
=======================================
--- /src/pkg/runtime/windows/thread.c Thu Nov 3 14:35:28 2011
+++ /src/pkg/runtime/windows/thread.c Wed Nov 9 12:17:05 2011
@@ -150,10 +150,25 @@
runtime·stdcall(runtime·Sleep, 1, (uintptr)us);
}

-void
-runtime·semasleep(void)
-{
- runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, (uintptr)-1);
+#define INFINITE ((uintptr)0xFFFFFFFF)
+
+int32
+runtime·semasleep(int64 ns)
+{
+ uintptr ms;
+
+ if(ns < 0)
+ ms = INFINITE;
+ else if(ns/1000000 > 0x7fffffffLL)
+ ms = 0x7fffffff;
+ else {
+ ms = ns/1000000;
+ if(ms == 0)
+ ms = 1;
+ }
+ if(runtime·stdcall(runtime·WaitForSingleObject, 2, m->waitsema, ms) != 0)
+ return -1; // timeout
+ return 0;
}

void
@@ -198,7 +213,7 @@
int64 filetime;

runtime·stdcall(runtime·GetSystemTimeAsFileTime, 1, &filetime);
-
+
// Filetime is 100s of nanoseconds since January 1, 1601.
// Convert to nanoseconds since January 1, 1970.
return (filetime - 116444736000000000LL) * 100LL;
=======================================
--- /src/pkg/time/sleep.go Mon May 23 12:38:51 2011
+++ /src/pkg/time/sleep.go Wed Nov 9 12:17:05 2011
@@ -4,54 +4,60 @@

package time

-import (
- "container/heap"
- "sync"
-)
+// Interface to timers implemented in package runtime.
+// Must be in sync with ../runtime/runtime.h:/^struct.Timer$
+type runtimeTimer struct {
+ i int32
+ when int64
+ period int64
+ f func(int64, interface{})
+ arg interface{}
+}
+
+func startTimer(*runtimeTimer)
+func stopTimer(*runtimeTimer) bool

// The Timer type represents a single event.
-// When the Timer expires, the current time will be sent on C
-// unless the Timer represents an AfterFunc event.
+// When the Timer expires, the current time will be sent on C,
+// unless the Timer was created by AfterFunc.
type Timer struct {
C <-chan int64
- t int64 // The absolute time that the event should fire.
- f func(int64) // The function to call when the event fires.
- i int // The event's index inside eventHeap.
+ r runtimeTimer
}

-type timerHeap []*Timer
-
-// forever is the absolute time (in ns) of an event that is forever away.
-const forever = 1 << 62
-
-// maxSleepTime is the maximum length of time that a sleeper
-// sleeps for before checking if it is defunct.
-const maxSleepTime = 1e9
-
-var (
- // timerMutex guards the variables inside this var group.
- timerMutex sync.Mutex
-
- // timers holds a binary heap of pending events, terminated with a
sentinel.
- timers timerHeap
-
- // currentSleeper is an ever-incrementing counter which represents
- // the current sleeper. It allows older sleepers to detect that they are
- // defunct and exit.
- currentSleeper int64
-)
-
-func init() {
- timers.Push(&Timer{t: forever}) // sentinel
+// Stop prevents the Timer from firing.
+// It returns true if the call stops the timer, false if the timer has
already
+// expired or stopped.
+func (t *Timer) Stop() (ok bool) {
+ return stopTimer(&t.r)
}

// NewTimer creates a new Timer that will send
// the current time on its channel after at least ns nanoseconds.
func NewTimer(ns int64) *Timer {
c := make(chan int64, 1)
- e := after(ns, func(t int64) { c <- t })
- e.C = c
- return e
+ t := &Timer{
+ C: c,
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ f: sendTime,
+ arg: c,
+ },
+ }
+ startTimer(&t.r)
+ return t
+}
+
+func sendTime(now int64, c interface{}) {
+ // Non-blocking send of time on c.
+ // Used in NewTimer, it cannot block anyway (buffer).
+ // Used in NewTicker, dropping sends on the floor is
+ // the desired behavior when the reader gets behind,
+ // because the sends are periodic.
+ select {
+ case c.(chan int64) <- now:
+ default:
+ }
}

// After waits at least ns nanoseconds before sending the current time
@@ -65,113 +71,17 @@
// in its own goroutine. It returns a Timer that can
// be used to cancel the call using its Stop method.
func AfterFunc(ns int64, f func()) *Timer {
- return after(ns, func(_ int64) {
- go f()
- })
-}
-
-// Stop prevents the Timer from firing.
-// It returns true if the call stops the timer, false if the timer has
already
-// expired or stopped.
-func (e *Timer) Stop() (ok bool) {
- timerMutex.Lock()
- // Avoid removing the first event in the queue so that
- // we don't start a new sleeper unnecessarily.
- if e.i > 0 {
- heap.Remove(timers, e.i)
- }
- ok = e.f != nil
- e.f = nil
- timerMutex.Unlock()
- return
+ t := &Timer{
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ f: goFunc,
+ arg: f,
+ },
+ }
+ startTimer(&t.r)
+ return t
}

-// after is the implementation of After and AfterFunc.
-// When the current time is after ns, it calls f with the current time.
-// It assumes that f will not block.
-func after(ns int64, f func(int64)) (e *Timer) {
- now := Nanoseconds()
- t := now + ns
- if ns > 0 && t < now {
- panic("time: time overflow")
- }
- timerMutex.Lock()
- t0 := timers[0].t
- e = &Timer{nil, t, f, -1}
- heap.Push(timers, e)
- // Start a new sleeper if the new event is before
- // the first event in the queue. If the length of time
- // until the new event is at least maxSleepTime,
- // then we're guaranteed that the sleeper will wake up
- // in time to service it, so no new sleeper is needed.
- if t0 > t && (t0 == forever || ns < maxSleepTime) {
- currentSleeper++
- go sleeper(currentSleeper)
- }
- timerMutex.Unlock()
- return
-}
-
-// sleeper continually looks at the earliest event in the queue, waits
until it happens,
-// then removes any events in the queue that are due. It stops when the
queue
-// is empty or when another sleeper has been started.
-func sleeper(sleeperId int64) {
- timerMutex.Lock()
- e := timers[0]
- t := Nanoseconds()
- for e.t != forever {
- if dt := e.t - t; dt > 0 {
- if dt > maxSleepTime {
- dt = maxSleepTime
- }
- timerMutex.Unlock()
- sysSleep(dt)
- timerMutex.Lock()
- if currentSleeper != sleeperId {
- // Another sleeper has been started, making this one redundant.
- break
- }
- }
- e = timers[0]
- t = Nanoseconds()
- for t >= e.t {
- if e.f != nil {
- e.f(t)
- e.f = nil
- }
- heap.Pop(timers)
- e = timers[0]
- }
- }
- timerMutex.Unlock()
-}
-
-func (timerHeap) Len() int {
- return len(timers)
-}
-
-func (timerHeap) Less(i, j int) bool {
- return timers[i].t < timers[j].t
-}
-
-func (timerHeap) Swap(i, j int) {
- timers[i], timers[j] = timers[j], timers[i]
- timers[i].i = i
- timers[j].i = j
-}
-
-func (timerHeap) Push(x interface{}) {
- e := x.(*Timer)
- e.i = len(timers)
- timers = append(timers, e)
-}
-
-func (timerHeap) Pop() interface{} {
- // TODO: possibly shrink array.
- n := len(timers) - 1
- e := timers[n]
- timers[n] = nil
- timers = timers[0:n]
- e.i = -1
- return e
-}
+func goFunc(now int64, arg interface{}) {
+ go arg.(func())()
+}
=======================================
--- /src/pkg/time/sys.go Thu Nov 3 14:35:28 2011
+++ /src/pkg/time/sys.go Wed Nov 9 12:17:05 2011
@@ -17,25 +17,4 @@
func Nanoseconds() int64

// Sleep pauses the current goroutine for at least ns nanoseconds.
-// Higher resolution sleeping may be provided by syscall.Nanosleep
-// on some operating systems.
-func Sleep(ns int64) error {
- _, err := sleep(Nanoseconds(), ns)
- return err
-}
-
-// sleep takes the current time and a duration,
-// pauses for at least ns nanoseconds, and
-// returns the current time and an error.
-func sleep(t, ns int64) (int64, error) {
- // TODO(cw): use monotonic-time once it's available
- end := t + ns
- for t < end {
- err := sysSleep(end - t)
- if err != nil {
- return 0, err
- }
- t = Nanoseconds()
- }
- return t, nil
-}
+func Sleep(ns int64)
=======================================
--- /src/pkg/time/tick.go Tue Nov 1 19:05:34 2011
+++ /src/pkg/time/tick.go Wed Nov 9 12:17:05 2011
@@ -4,155 +4,14 @@

package time

-import (
- "errors"
- "sync"
-)
+import "errors"

// A Ticker holds a synchronous channel that delivers `ticks' of a clock
// at intervals.
type Ticker struct {
- C <-chan int64 // The channel on which the ticks are delivered.
- c chan<- int64 // The same channel, but the end we use.
- ns int64
- shutdown chan bool // Buffered channel used to signal shutdown.
- nextTick int64
- next *Ticker
-}
-
-// Stop turns off a ticker. After Stop, no more ticks will be sent.
-func (t *Ticker) Stop() {
- select {
- case t.shutdown <- true:
- // ok
- default:
- // Stop in progress already
- }
-}
-
-// Tick is a convenience wrapper for NewTicker providing access to the
ticking
-// channel only. Useful for clients that have no need to shut down the
ticker.
-func Tick(ns int64) <-chan int64 {
- if ns <= 0 {
- return nil
- }
- return NewTicker(ns).C
-}
-
-type alarmer struct {
- wakeUp chan bool // wakeup signals sent/received here
- wakeMeAt chan int64
- wakeTime int64
-}
-
-// Set alarm to go off at time ns, if not already set earlier.
-func (a *alarmer) set(ns int64) {
- switch {
- case a.wakeTime > ns:
- // Next tick we expect is too late; shut down the late runner
- // and (after fallthrough) start a new wakeLoop.
- close(a.wakeMeAt)
- fallthrough
- case a.wakeMeAt == nil:
- // There's no wakeLoop, start one.
- a.wakeMeAt = make(chan int64)
- a.wakeUp = make(chan bool, 1)
- go wakeLoop(a.wakeMeAt, a.wakeUp)
- fallthrough
- case a.wakeTime == 0:
- // Nobody else is waiting; it's just us.
- a.wakeTime = ns
- a.wakeMeAt <- ns
- default:
- // There's already someone scheduled.
- }
-}
-
-// Channel to notify tickerLoop of new Tickers being created.
-var newTicker chan *Ticker
-
-func startTickerLoop() {
- newTicker = make(chan *Ticker)
- go tickerLoop()
-}
-
-// wakeLoop delivers ticks at scheduled times, sleeping until the right
moment.
-// If another, earlier Ticker is created while it sleeps, tickerLoop()
will start a new
-// wakeLoop and signal that this one is done by closing the wakeMeAt
channel.
-func wakeLoop(wakeMeAt chan int64, wakeUp chan bool) {
- for wakeAt := range wakeMeAt {
- Sleep(wakeAt - Nanoseconds())
- wakeUp <- true
- }
-}
-
-// A single tickerLoop serves all ticks to Tickers. It waits for two
events:
-// either the creation of a new Ticker or a tick from the alarm,
-// signaling a time to wake up one or more Tickers.
-func tickerLoop() {
- // Represents the next alarm to be delivered.
- var alarm alarmer
- var now, wakeTime int64
- var tickers *Ticker
- for {
- select {
- case t := <-newTicker:
- // Add Ticker to list
- t.next = tickers
- tickers = t
- // Arrange for a new alarm if this one precedes the existing one.
- alarm.set(t.nextTick)
- case <-alarm.wakeUp:
- now = Nanoseconds()
- wakeTime = now + 1e15 // very long in the future
- var prev *Ticker = nil
- // Scan list of tickers, delivering updates to those
- // that need it and determining the next wake time.
- // TODO(r): list should be sorted in time order.
- for t := tickers; t != nil; t = t.next {
- select {
- case <-t.shutdown:
- // Ticker is done; remove it from list.
- if prev == nil {
- tickers = t.next
- } else {
- prev.next = t.next
- }
- continue
- default:
- }
- if t.nextTick <= now {
- if len(t.c) == 0 {
- // Only send if there's room. We must not block.
- // The channel is allocated with a one-element
- // buffer, which is sufficient: if he hasn't picked
- // up the last tick, no point in sending more.
- t.c <- now
- }
- t.nextTick += t.ns
- if t.nextTick <= now {
- // Still behind; advance in one big step.
- t.nextTick += (now - t.nextTick + t.ns) / t.ns * t.ns
- }
- }
- if t.nextTick < wakeTime {
- wakeTime = t.nextTick
- }
- prev = t
- }
- if tickers != nil {
- // Please send wakeup at earliest required time.
- // If there are no tickers, don't bother.
- alarm.wakeTime = wakeTime
- alarm.wakeMeAt <- wakeTime
- } else {
- alarm.wakeTime = 0
- }
- }
- }
-}
-
-var onceStartTickerLoop sync.Once
+ C <-chan int64 // The channel on which the ticks are delivered.
+ r runtimeTimer
+}

// NewTicker returns a new Ticker containing a channel that will
// send the time, in nanoseconds, every ns nanoseconds. It adjusts the
@@ -162,16 +21,33 @@
if ns <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
- c := make(chan int64, 1) // See comment on send in tickerLoop
+ // Give the channel a 1-element time buffer.
+ // If the client falls behind while reading, we drop ticks
+ // on the floor until the client catches up.
+ c := make(chan int64, 1)
t := &Ticker{
- C: c,
- c: c,
- ns: ns,
- shutdown: make(chan bool, 1),
- nextTick: Nanoseconds() + ns,
- }
- onceStartTickerLoop.Do(startTickerLoop)
- // must be run in background so global Tickers can be created
- go func() { newTicker <- t }()
+ C: c,
+ r: runtimeTimer{
+ when: Nanoseconds() + ns,
+ period: ns,
+ f: sendTime,
+ arg: c,
+ },
+ }
+ startTimer(&t.r)
return t
}
+
+// Stop turns off a ticker. After Stop, no more ticks will be sent.
+func (t *Ticker) Stop() {
+ stopTimer(&t.r)
+}
+
+// Tick is a convenience wrapper for NewTicker providing access to the
ticking
+// channel only. Useful for clients that have no need to shut down the
ticker.
+func Tick(ns int64) <-chan int64 {
+ if ns <= 0 {
+ return nil
+ }
+ return NewTicker(ns).C
+}
Reply all
Reply to author
Forward
0 new messages