Set Ready For Review
To view, visit change 404515. To unsubscribe, or for help writing mail filters, visit settings.
Changkun Ou uploaded patch set #2 to this change.
x/sync/errgroup: add TryGo and SetLimit to control concurrency
DO NOT SUBMIT until the proposal is accepted.
To avoid breaking change, SetLimit only accepts non-negative values.
Fixes golang/go#27837
Change-Id: I04e864e24734e0ac2b62e7058b69d843afcc620e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 209 insertions(+), 1 deletion(-)
To view, visit change 404515. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 2:Run-TryBot +1
Set Ready For Review
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 1:Run-TryBot +1
1 comment:
File errgroup/errgroup.go:
// Otherwise, ng is set to limitNone and we should block forever.
select {}
I don't actually know what should we do about this case, and it seems if someone SetLimit(0) then calls Go, the function will block forever, and no chance to release from the blocking here.
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Changkun Ou uploaded patch set #2 to this change.
x/sync/errgroup: add TryGo and SetLimit to control concurrency
DO NOT SUBMIT until the proposal is accepted.
This change implements the proposed behavior of SetLimit that
SetLimit(0) means TryGo will always fail. See CL 404515 as a comparison.
Fixes golang/go#27837
Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 236 insertions(+), 1 deletion(-)
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Changkun Ou uploaded patch set #3 to this change.
x/sync/errgroup: add TryGo and SetLimit to control concurrency
DO NOT SUBMIT until the proposal is accepted.
This change implements the proposed behavior of SetLimit differently
that SetLimit(0) only accepts non-negative values.
See CL 405174 as a comparison.
Fixes golang/go#27837
Change-Id: I04e864e24734e0ac2b62e7058b69d843afcc620e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 220 insertions(+), 1 deletion(-)
To view, visit change 404515. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 3:Run-TryBot +1
Changkun Ou abandoned this change.
To view, visit change 404515. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Changkun Ou uploaded patch set #3 to this change.
The following approvals got outdated and were removed: Run-TryBot+1 by Changkun Ou, TryBot-Result+1 by Gopher Robot
x/sync/errgroup: add TryGo and SetLimit to control concurrency
Fixes golang/go#27837
Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 231 insertions(+), 1 deletion(-)
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Bryan Mills.
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Bryan Mills, Changkun Ou.
3 comments:
File errgroup/errgroup.go:
Patch Set #2, Line 18: // limitNone indicates a limit that concurrency is not allowed.
I don't quite follow this comment — it's not just that concurrency is not allowed, but that no goroutines may be started.
mu sync.Mutex
ng int64 // atomic
waiters list.List
I have a pretty strong preference for avoiding mutexes and explicit wait-lists.
Given that we don't allow changing the limit with `Go` calls in flight, I think this could be implemented fairly efficiently using two channels of integers: one that receives the waiter-count when under the limit, and one than receives it when at the limit. That might look like:
```
type token struct{}
type Group struct {
…
sem chan token
…
}func (g *Group) done() (int, bool) {
if g.sem != nil {
<-g.sem
}
g.wg.Done()
}func (g *Group) Go(f …) {
if g.sem != nil {
g.sem <- token{}
}
…
go func() {
defer g.done()
…
}()
}func (g *Group) TryGo(f …) bool {
if g.sem != nil {
select {
case g.sem <-token{}:
// Note: this allows barging iff channels in general allow barging.
default:
return false
}
}
…
}func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if len(g.sem) != 0 {
panic(…)
}
g.sem = make(chan token, n)
}
```Note that the unsynchronized writes to `g.sem` within `SetLimit` cause the race detector to report concurrent calls to `SetLimit` with any other method as either a read/read or read/write race. (That gives us the option to define the interaction between `SetLimit` and active goroutines at a later date.)
if g.ctx != nil {
select {
case <-g.ctx.Done():
return false
default:
}
}
Per https://github.com/golang/go/issues/27837#issuecomment-1098311098, `TryGo` should only care about the limit, not whether the `Context` has been canceled.
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Bryan Mills, Changkun Ou.
Changkun Ou uploaded patch set #4 to this change.
The following approvals got outdated and were removed: Run-TryBot+1 by Changkun Ou, TryBot-Result+1 by Gopher Robot
x/sync/errgroup: add TryGo and SetLimit to control concurrency
This benchmark shows the difference between two implementations.
Using explicit waiter with mutex (old, before PS3) or channel (new,
since PS4). There is no significant difference at a measure:
name old time/op new time/op delta
Go-8 247ns ±10% 245ns ±10% ~ (p=0.571 n=5+10)
name old alloc/op new alloc/op delta
Go-8 48.0B ± 0% 40.0B ± 0% -16.67% (p=0.000 n=5+10)
name old allocs/op new allocs/op delta
Go-8 2.00 ± 0% 2.00 ± 0% ~ (all equal)
Fixes golang/go#27837
Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 163 insertions(+), 2 deletions(-)
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Bryan Mills.
Patch set 4:Run-TryBot +1
4 comments:
File errgroup/errgroup.go:
// Otherwise, ng is set to limitNone and we should block forever.
select {}
I don't actually know what should we do about this case, and it seems if someone SetLimit(0) then ca […]
Done
File errgroup/errgroup.go:
Patch Set #2, Line 18: // limitNone indicates a limit that concurrency is not allowed.
I don't quite follow this comment — it's not just that concurrency is not allowed, but that no gorou […]
Ack
mu sync.Mutex
ng int64 // atomic
waiters list.List
I have a pretty strong preference for avoiding mutexes and explicit wait-lists. […]
Done
if g.ctx != nil {
select {
case <-g.ctx.Done():
return false
default:
}
}
Per https://github. […]
Ack
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Patch set 4:Code-Review +2
3 comments:
File errgroup/errgroup_test.go:
Patch Set #4, Line 189: success
(nit) s/success/succeed/
(throughout these Fatalf messages)
Patch Set #4, Line 214: succeded
(typo) s/succeded/succeeded/
(here and below)
Patch Set #4, Line 234: g.Go(func() error { fn(); return nil })
You could use an atomic variable here to verify (best-effort) that the limit is not violated:
```
func TestGoLimit(t *testing.T) {
const limit = 10
g := &errgroup.Group{}
g.SetLimit(limit)
var active int32
for i := 0; i <= 1<<10; i++ {
g.Go(func() error {
n := atomic.AddInt32(&active, 1)
if n > limit {
return fmt.Errorf("saw %d active goroutines; want ≤ %d", n, limit)
}
time.Sleep(1 * time.Microsecond) // Give other goroutines a chance to increment active.
atomic.AddInt32(&active, -1)
return nil
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
```To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Changkun Ou uploaded patch set #5 to this change.
The following approvals got outdated and were removed: Run-TryBot+1 by Changkun Ou, TryBot-Result+1 by Gopher Robot
x/sync/errgroup: add TryGo and SetLimit to control concurrency
This benchmark shows the difference between two implementations.
Using explicit waiter with mutex (old, before PS3) or channel (new,
since PS4). There is no significant difference at a measure:
name old time/op new time/op delta
Go-8 247ns ±10% 245ns ±10% ~ (p=0.571 n=5+10)
name old alloc/op new alloc/op delta
Go-8 48.0B ± 0% 40.0B ± 0% -16.67% (p=0.000 n=5+10)
name old allocs/op new allocs/op delta
Go-8 2.00 ± 0% 2.00 ± 0% ~ (all equal)
Fixes golang/go#27837
Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 177 insertions(+), 2 deletions(-)
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Patch set 5:Run-TryBot +1
3 comments:
File errgroup/errgroup_test.go:
Patch Set #4, Line 189: success
(nit) s/success/succeed/ […]
Done
Patch Set #4, Line 214: succeded
(typo) s/succeded/succeeded/ […]
Done
Patch Set #4, Line 234: g.Go(func() error { fn(); return nil })
You could use an atomic variable here to verify (best-effort) that the limit is not violated: […]
Done
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.
Attention is currently required from: Changkun Ou.
Patch set 5:Auto-Submit +1Code-Review +2
Attention is currently required from: Changkun Ou.
Patch set 5:Code-Review +1
Gopher Robot submitted this change.
x/sync/errgroup: add TryGo and SetLimit to control concurrency
This benchmark shows the difference between two implementations.
Using explicit waiter with mutex (old, before PS3) or channel (new,
since PS4). There is no significant difference at a measure:
name old time/op new time/op delta
Go-8 247ns ±10% 245ns ±10% ~ (p=0.571 n=5+10)
name old alloc/op new alloc/op delta
Go-8 48.0B ± 0% 40.0B ± 0% -16.67% (p=0.000 n=5+10)
name old allocs/op new allocs/op delta
Go-8 2.00 ± 0% 2.00 ± 0% ~ (all equal)
Fixes golang/go#27837
Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e
Reviewed-on: https://go-review.googlesource.com/c/sync/+/405174
Reviewed-by: Bryan Mills <bcm...@google.com>
Reviewed-by: Heschi Kreinick <hes...@google.com>
TryBot-Result: Gopher Robot <go...@golang.org>
Run-TryBot: Changkun Ou <ma...@changkun.de>
Auto-Submit: Bryan Mills <bcm...@google.com>
---
M errgroup/errgroup.go
M errgroup/errgroup_test.go
2 files changed, 183 insertions(+), 2 deletions(-)
diff --git a/errgroup/errgroup.go b/errgroup/errgroup.go
index 9857fe5..1eab2fd 100644
--- a/errgroup/errgroup.go
+++ b/errgroup/errgroup.go
@@ -8,9 +8,12 @@
import (
"context"
+ "fmt"
"sync"
)
+type token struct{}
+
// A Group is a collection of goroutines working on subtasks that are part of
// the same overall task.
//
@@ -20,10 +23,19 @@
wg sync.WaitGroup
+ sem chan token
+
errOnce sync.Once
err error
}
+func (g *Group) done() {
+ if g.sem != nil {
+ <-g.sem
+ }
+ g.wg.Done()
+}
+
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
@@ -45,14 +57,19 @@
}
// Go calls the given function in a new goroutine.
+// It blocks until the new goroutine can be added without the number of
+// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
- g.wg.Add(1)
+ if g.sem != nil {
+ g.sem <- token{}
+ }
+ g.wg.Add(1)
go func() {
- defer g.wg.Done()
+ defer g.done()
if err := f(); err != nil {
g.errOnce.Do(func() {
@@ -64,3 +81,51 @@
}
}()
}
+
+// TryGo calls the given function in a new goroutine only if the number of
+// active goroutines in the group is currently below the configured limit.
+//
+// The return value reports whether the goroutine was started.
+func (g *Group) TryGo(f func() error) bool {
+ if g.sem != nil {
+ select {
+ case g.sem <- token{}:
+ // Note: this allows barging iff channels in general allow barging.
+ default:
+ return false
+ }
+ }
+
+ g.wg.Add(1)
+ go func() {
+ defer g.done()
+
+ if err := f(); err != nil {
+ g.errOnce.Do(func() {
+ g.err = err
+ if g.cancel != nil {
+ g.cancel()
+ }
+ })
+ }
+ }()
+ return true
+}
+
+// SetLimit limits the number of active goroutines in this group to at most n.
+// A negative value indicates no limit.
+//
+// Any subsequent call to the Go method will block until it can add an active
+// goroutine without exceeding the configured limit.
+//
+// The limit must not be modified while any goroutines in the group are active.
+func (g *Group) SetLimit(n int) {
+ if n < 0 {
+ g.sem = nil
+ return
+ }
+ if len(g.sem) != 0 {
+ panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
+ }
+ g.sem = make(chan token, n)
+}
diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go
index 5a0b9cb..0358842 100644
--- a/errgroup/errgroup_test.go
+++ b/errgroup/errgroup_test.go
@@ -10,7 +10,9 @@
"fmt"
"net/http"
"os"
+ "sync/atomic"
"testing"
+ "time"
"golang.org/x/sync/errgroup"
)
@@ -174,3 +176,87 @@
}
}
}
+
+func TestTryGo(t *testing.T) {
+ g := &errgroup.Group{}
+ n := 42
+ g.SetLimit(42)
+ ch := make(chan struct{})
+ fn := func() error {
+ ch <- struct{}{}
+ return nil
+ }
+ for i := 0; i < n; i++ {
+ if !g.TryGo(fn) {
+ t.Fatalf("TryGo should succeed but got fail at %d-th call.", i)
+ }
+ }
+ if g.TryGo(fn) {
+ t.Fatalf("TryGo is expected to fail but succeeded.")
+ }
+ go func() {
+ for i := 0; i < n; i++ {
+ <-ch
+ }
+ }()
+ g.Wait()
+
+ if !g.TryGo(fn) {
+ t.Fatalf("TryGo should success but got fail after all goroutines.")
+ }
+ go func() { <-ch }()
+ g.Wait()
+
+ // Switch limit.
+ g.SetLimit(1)
+ if !g.TryGo(fn) {
+ t.Fatalf("TryGo should success but got failed.")
+ }
+ if g.TryGo(fn) {
+ t.Fatalf("TryGo should fail but succeeded.")
+ }
+ go func() { <-ch }()
+ g.Wait()
+
+ // Block all calls.
+ g.SetLimit(0)
+ for i := 0; i < 1<<10; i++ {
+ if g.TryGo(fn) {
+ t.Fatalf("TryGo should fail but got succeded.")
+ }
+ }
+ g.Wait()
+}
+
+func TestGoLimit(t *testing.T) {
+ const limit = 10
+
+ g := &errgroup.Group{}
+ g.SetLimit(limit)
+ var active int32
+ for i := 0; i <= 1<<10; i++ {
+ g.Go(func() error {
+ n := atomic.AddInt32(&active, 1)
+ if n > limit {
+ return fmt.Errorf("saw %d active goroutines; want ≤ %d", n, limit)
+ }
+ time.Sleep(1 * time.Microsecond) // Give other goroutines a chance to increment active.
+ atomic.AddInt32(&active, -1)
+ return nil
+ })
+ }
+ if err := g.Wait(); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func BenchmarkGo(b *testing.B) {
+ fn := func() {}
+ g := &errgroup.Group{}
+ b.ResetTimer()
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ g.Go(func() error { fn(); return nil })
+ }
+ g.Wait()
+}
To view, visit change 405174. To unsubscribe, or for help writing mail filters, visit settings.