Congratulations on opening your first change. Thank you for your contribution!
Next steps:
A maintainer will review your change and provide feedback. See
https://go.dev/doc/contribute#review for more info and tips to get your
patch through code review.
Most changes in the Go project go through a few rounds of revision. This can be
surprising to people new to the project. The careful, iterative review process
is our way of helping mentor contributors and ensuring that their contributions
have a lasting impact.
During May-July and Nov-Jan the Go project is in a code freeze, during which
little code gets reviewed or merged. If a reviewer responds with a comment like
R=go1.11 or adds a tag like "wait-release", it means that this CL will be
reviewed as part of the next development cycle. See https://go.dev/s/release
for more details.
To view, visit change 525435. To unsubscribe, or for help writing mail filters, visit settings.
jerry zhuang has uploaded this change for review.
runtime: reduce contention in gcDrain
there is a large contention in lfstack head when running on
machines with large number of cores.
try using elimination-backoff stack to reduce cas contention
Signed-off-by: jerryzhuang <zhua...@gmail.com>
Change-Id: I9537b69a239dfc0b6c14d32a0b7ea8c5cd3b9326
---
A src/runtime/ebstack.go
M src/runtime/export_test.go
M src/runtime/lfstack_test.go
M src/runtime/mgc.go
M src/runtime/mgcmark.go
M src/runtime/mgcwork.go
6 files changed, 260 insertions(+), 9 deletions(-)
diff --git a/src/runtime/ebstack.go b/src/runtime/ebstack.go
new file mode 100644
index 0000000..ff91197
--- /dev/null
+++ b/src/runtime/ebstack.go
@@ -0,0 +1,137 @@
+package runtime
+
+import (
+ "internal/cpu"
+ "internal/goarch"
+ "runtime/internal/atomic"
+ "unsafe"
+)
+
+// elimination-backoff stack algorithm, to reduce cas contention
+// see https://people.csail.mit.edu/shanir/publications/Lock_Free.pdf
+type ebstack struct {
+ head uint64
+ _ cpu.CacheLinePad
+ exchanger [ebStackArrayLen]struct {
+ e uint64
+ _ [cpu.CacheLinePadSize - goarch.PtrSize]byte
+ }
+}
+
+const (
+ eliminationBackoffEmpty uint64 = 0
+ eliminationBackoffWaiting uint64 = 1
+ eliminationBackoffBusy uint64 = 2
+
+ ebStackPinTimeout int64 = 10000
+
+ ebStackArrayLen = 32
+)
+
+func (ebs *ebstack) push(node *lfnode) {
+ node.pushcnt++
+ new := lfstackPack(node, node.pushcnt)
+ if node1 := lfstackUnpack(new); node1 != node {
+ print("runtime: lfstack.push invalid packing: node=", node, " cnt=", hex(node.pushcnt), " packed=", hex(new), " -> node=", node1, "\n")
+ throw("lfstack.push")
+ }
+ for {
+ old := atomic.Load64((*uint64)(&ebs.head))
+ node.next = old
+ if atomic.Cas64((*uint64)(&ebs.head), old, new) {
+ return
+ }
+ // try backoff
+ y, success := ebs.exchange(node)
+ if !success {
+ continue
+ }
+ if y == nil {
+ return
+ }
+ }
+}
+
+func (ebs *ebstack) pop() unsafe.Pointer {
+ for {
+ old := atomic.Load64((*uint64)(&ebs.head))
+ if old == 0 {
+ return nil
+ }
+ node := lfstackUnpack(old)
+ next := atomic.Load64(&node.next)
+ if atomic.Cas64((*uint64)(&ebs.head), old, next) {
+ return unsafe.Pointer(node)
+ }
+ y, success := ebs.exchange(nil)
+ if !success {
+ continue
+ }
+ if y != nil {
+ return unsafe.Pointer(y)
+ }
+ }
+}
+
+func (ebs *ebstack) empty() bool {
+ return atomic.Load64(&ebs.head) == 0
+}
+
+func (ebs *ebstack) exchange(node *lfnode) (*lfnode, bool) {
+ i := fastrandn(ebStackArrayLen)
+
+ deadline := nanotime() + ebStackPinTimeout
+
+ var old, status uint64
+ var oldNode *lfnode
+
+ for nanotime() < deadline {
+ old = atomic.Load64(&ebs.exchanger[i].e)
+ oldNode, status = ebstackUnpack(old)
+
+ if status == eliminationBackoffEmpty {
+ new := ebstackPack(node, uintptr(eliminationBackoffWaiting))
+ if atomic.Cas64(&ebs.exchanger[i].e, old, new) {
+ // wait for the other thread to exchange
+ for nanotime() < deadline {
+ old = atomic.Load64(&ebs.exchanger[i].e)
+ oldNode, status = ebstackUnpack(old)
+ if status != eliminationBackoffBusy {
+ procyield(10)
+ continue
+ }
+ // exchange successfully, reset to EMPTY
+ atomic.Store64(&ebs.exchanger[i].e, 0)
+ return oldNode, true
+ }
+ // timeout, try reset state to EMPTY
+ if atomic.Cas64(&ebs.exchanger[i].e, new, 0) {
+ return nil, false
+ }
+ // fails, some exchang-ing thread must have shown up,
+ // exchange completes
+ old = atomic.Load64(&ebs.exchanger[i].e)
+ oldNode, _ = ebstackUnpack(old)
+ atomic.Store64(&ebs.exchanger[i].e, 0)
+ return oldNode, true
+ }
+ } else if status == eliminationBackoffWaiting {
+ new := ebstackPack(node, 2)
+ if atomic.Cas64(&ebs.exchanger[i].e, old, new) {
+ return oldNode, true
+ }
+ }
+ procyield(10)
+ }
+
+ return nil, false
+}
+
+func ebstackPack(node *lfnode, cnt uintptr) uint64 {
+ return uint64(taggedPointerPack(unsafe.Pointer(node), cnt))
+}
+
+func ebstackUnpack(val uint64) (*lfnode, uint64) {
+ tp := taggedPointer(val)
+ return (*lfnode)(tp.pointer()), uint64(tp.tag())
+}
diff --git a/src/runtime/export_test.go b/src/runtime/export_test.go
index c43c5d05..3029e2d 100644
--- a/src/runtime/export_test.go
+++ b/src/runtime/export_test.go
@@ -79,6 +79,18 @@
lfnodeValidate((*lfnode)(unsafe.Pointer(node)))
}
+type EBStack struct {
+ s ebstack
+}
+
+func EBStackPush(ebs *EBStack, node *LFNode) {
+ ebs.s.push((*lfnode)(unsafe.Pointer(node)))
+}
+
+func EBStackPop(ebs *EBStack) *LFNode {
+ return (*LFNode)(unsafe.Pointer(ebs.s.pop()))
+}
+
func Netpoll(delta int64) {
systemstack(func() {
netpoll(delta)
diff --git a/src/runtime/lfstack_test.go b/src/runtime/lfstack_test.go
index e36297e..0f58657 100644
--- a/src/runtime/lfstack_test.go
+++ b/src/runtime/lfstack_test.go
@@ -135,3 +135,105 @@
t.Fatalf("Wrong sum %d/%d", sum2, sum)
}
}
+
+func TestEBStackStress(t *testing.T) {
+ const K = 100
+ P := 4 * GOMAXPROCS(-1)
+ N := 100000
+ if testing.Short() {
+ N /= 10
+ }
+ // Create 2 stacks.
+ stacks := [2]*EBStack{new(EBStack), new(EBStack)}
+ // Push K elements randomly onto the stacks.
+ sum := 0
+ for i := 0; i < K; i++ {
+ sum += i
+ node := allocMyNode(i)
+ EBStackPush(stacks[i%2], fromMyNode(node))
+ }
+ c := make(chan bool, P)
+ for p := 0; p < P; p++ {
+ go func() {
+ r := rand.New(rand.NewSource(rand.Int63()))
+ // Pop a node from a random stack, then push it onto a random stack.
+ for i := 0; i < N; i++ {
+ node := toMyNode(EBStackPop(stacks[r.Intn(2)]))
+ if node != nil {
+ EBStackPush(stacks[r.Intn(2)], fromMyNode(node))
+ }
+ }
+ c <- true
+ }()
+ }
+ for i := 0; i < P; i++ {
+ <-c
+ }
+ // Pop all elements from both stacks, and verify that nothing lost.
+ sum2 := 0
+ cnt := 0
+ for i := 0; i < 2; i++ {
+ for {
+ node := toMyNode(EBStackPop(stacks[i]))
+ if node == nil {
+ break
+ }
+ cnt++
+ sum2 += node.data
+ node.Next = 0
+ }
+ }
+ if cnt != K {
+ t.Fatalf("Wrong number of nodes %d/%d", cnt, K)
+ }
+ if sum2 != sum {
+ t.Fatalf("Wrong sum %d/%d", sum2, sum)
+ }
+}
+
+func TestBackoffStack2(t *testing.T) {
+ stack := new(EBStack)
+
+ if EBStackPop(stack) != nil {
+ t.Fatalf("stack is not empty")
+ }
+
+ // Check the stack is initially empty.
+ if EBStackPop(stack) != nil {
+ t.Fatalf("stack is not empty")
+ }
+
+ // Push one element.
+ node := allocMyNode(42)
+ EBStackPush(stack, fromMyNode(node))
+
+ // Push another.
+ node = allocMyNode(43)
+ EBStackPush(stack, fromMyNode(node))
+
+ // Pop one element.
+ node = toMyNode(EBStackPop(stack))
+ if node == nil {
+ t.Fatalf("stack is empty")
+ }
+ if node.data != 43 {
+ t.Fatalf("no lifo")
+ }
+
+ // Pop another.
+ node = toMyNode(EBStackPop(stack))
+ if node == nil {
+ t.Fatalf("stack is empty")
+ }
+ if node.data != 42 {
+ t.Fatalf("no lifo")
+ }
+
+ // Check the stack is empty again.
+ if EBStackPop(stack) != nil {
+ t.Fatalf("stack is not empty")
+ }
+ // if *stack != 0 {
+ // t.Fatalf("stack is not empty")
+ // }
+}
diff --git a/src/runtime/mgc.go b/src/runtime/mgc.go
index 44ff5fb..ac4a524 100644
--- a/src/runtime/mgc.go
+++ b/src/runtime/mgc.go
@@ -306,9 +306,9 @@
var work workType
type workType struct {
- full lfstack // lock-free list of full blocks workbuf
+ full ebstack // lock-free list of full blocks workbuf
_ cpu.CacheLinePad // prevents false-sharing between full and empty
- empty lfstack // lock-free list of empty blocks workbuf
+ empty ebstack // lock-free list of empty blocks workbuf
_ cpu.CacheLinePad // prevents false-sharing between empty and nproc/nwait
wbufSpans struct {
@@ -1477,8 +1477,8 @@
work.tstart = startTime
// Check that there's no marking work remaining.
- if work.full != 0 || work.markrootNext < work.markrootJobs {
- print("runtime: full=", hex(work.full), " next=", work.markrootNext, " jobs=", work.markrootJobs, " nDataRoots=", work.nDataRoots, " nBSSRoots=", work.nBSSRoots, " nSpanRoots=", work.nSpanRoots, " nStackRoots=", work.nStackRoots, "\n")
+ if !work.full.empty() || work.markrootNext < work.markrootJobs {
+ print("runtime next=", work.markrootNext, " jobs=", work.markrootJobs, " nDataRoots=", work.nDataRoots, " nBSSRoots=", work.nBSSRoots, " nSpanRoots=", work.nSpanRoots, " nStackRoots=", work.nStackRoots, "\n")
panic("non-empty mark queue after concurrent mark")
}
diff --git a/src/runtime/mgcmark.go b/src/runtime/mgcmark.go
index 9ab3b48..5032db8 100644
--- a/src/runtime/mgcmark.go
+++ b/src/runtime/mgcmark.go
@@ -1111,7 +1111,7 @@
// just keep work available than to make workers wait. In the
// worst case, we'll do O(log(_WorkbufSize)) unnecessary
// balances.
- if work.full == 0 {
+ if work.full.empty() {
gcw.balance()
}
@@ -1191,7 +1191,7 @@
gp := getg().m.curg
for !gp.preempt && !gcCPULimiter.limiting() && workFlushed+gcw.heapScanWork < scanWork {
// See gcDrain comment.
- if work.full == 0 {
+ if work.full.empty() {
gcw.balance()
}
diff --git a/src/runtime/mgcwork.go b/src/runtime/mgcwork.go
index 7ab8975..c18c72c 100644
--- a/src/runtime/mgcwork.go
+++ b/src/runtime/mgcwork.go
@@ -351,7 +351,7 @@
//go:nowritebarrier
func getempty() *workbuf {
var b *workbuf
- if work.empty != 0 {
+ if !work.empty.empty() {
b = (*workbuf)(work.empty.pop())
if b != nil {
b.checkempty()
@@ -452,13 +452,13 @@
// workbufs are on the empty list.
func prepareFreeWorkbufs() {
lock(&work.wbufSpans.lock)
- if work.full != 0 {
+ if !work.full.empty() {
throw("cannot free workbufs when work.full != 0")
}
// Since all workbufs are on the empty list, we don't care
// which ones are in which spans. We can wipe the entire empty
// list and move all workbuf spans to the free list.
- work.empty = 0
+ work.empty = ebstack{}
work.wbufSpans.free.takeAll(&work.wbufSpans.busy)
unlock(&work.wbufSpans.lock)
}
To view, visit change 525435. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Austin Clements, Keith Randall, Martin Möhrmann, jerry zhuang.
Patch set 1:Commit-Queue +1