Congratulations on opening your first change. Thank you for your contribution!
Next steps:
Within the next week or so, a maintainer will review your change and provide
feedback. See https://golang.org/doc/contribute.html#review for more info and
tips to get your patch through code review.
Most changes in the Go project go through a few rounds of revision. This can be
surprising to people new to the project. The careful, iterative review process
is our way of helping mentor contributors and ensuring that their contributions
have a lasting impact.
During May-July and Nov-Jan the Go project is in a code freeze, during which
little code gets reviewed or merged. If a reviewer responds with a comment like
R=go1.11, it means that this CL will be reviewed as part of the next development
cycle. See https://golang.org/s/release for more details.
To view, visit change 157718. To unsubscribe, or for help writing mail filters, visit settings.
Gerrit Bot would like Sherif Abdel-Naby to review this change.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g worker-pool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Github Issue: https://github.com/golang/go/issues/29721
Gerrit: https://go-review.googlesource.com/c/sync/+/157718
Change-Id: I8c58b2ecfa98a39d91a3c61118a342913890a72f
GitHub-Last-Rev: b0fff298b718af3950647c2fff43b23a81f52d3a
GitHub-Pull-Request: golang/sync#3
---
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
4 files changed, 222 insertions(+), 12 deletions(-)
diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go
index ac53e73..8732332 100644
--- a/semaphore/semaphore.go
+++ b/semaphore/semaphore.go
@@ -26,10 +26,11 @@
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
- size int64
- cur int64
- mu sync.Mutex
- waiters list.List
+ size int64
+ cur int64
+ mu sync.Mutex
+ waiters list.List
+ impossibleWaiters list.List
}
// Acquire acquires the semaphore with a weight of n, blocking until resources
@@ -45,16 +46,16 @@
return nil
}
+ var waiterList = &s.waiters
+
if n > s.size {
- // Don't make other Acquire calls block on one that's doomed to fail.
- s.mu.Unlock()
- <-ctx.Done()
- return ctx.Err()
+ // Add doomed Acquire call to the Impossible waiters list.
+ waiterList = &s.impossibleWaiters
}
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
- elem := s.waiters.PushBack(w)
+ elem := waiterList.PushBack(w)
s.mu.Unlock()
select {
@@ -67,7 +68,7 @@
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
- s.waiters.Remove(elem)
+ waiterList.Remove(elem)
}
s.mu.Unlock()
return err
@@ -125,3 +126,76 @@
}
s.mu.Unlock()
}
+
+// Resize semaphore.
+func (s *Weighted) Resize(n int64) {
+ s.mu.Lock()
+ s.size = n
+ if s.size < 0 {
+ s.mu.Unlock()
+ panic("semaphore: bad resize")
+ }
+
+ // Add the now possible waiters to waiters list.
+ element := s.impossibleWaiters.Front()
+ for {
+ if element == nil {
+ break // No more impossible waiters blocked.
+ }
+
+ w := element.Value.(waiter)
+ if s.size < w.n {
+ // Still Impossible. next.
+ element = element.Next()
+ continue
+ }
+
+ s.waiters.PushBack(w)
+ toRemove := element
+ element = element.Next()
+ s.impossibleWaiters.Remove(toRemove)
+
+ }
+
+ // Add the now impossible-waiters to impossible waiters list.
+ element = s.waiters.Front()
+ for {
+ if element == nil {
+ break // No more waiters.
+ }
+
+ w := element.Value.(waiter)
+ if s.size >= w.n {
+ // Still Possible. next.
+ element = element.Next()
+ continue
+ }
+
+ s.impossibleWaiters.PushBack(w)
+ toRemove := element
+ element = element.Next()
+ s.waiters.Remove(toRemove)
+ }
+
+ // Release Possible Waiters
+ for {
+ next := s.waiters.Front()
+ if next == nil {
+ break // No more waiters blocked.
+ }
+
+ w := next.Value.(waiter)
+ if s.size-s.cur < w.n {
+ // Not enough tokens for the element waiter. We could keep going (to try to
+ // find a waiter with a smaller request), but under load that could cause
+ // starvation for large requests; instead, we leave all remaining waiters
+ // blocked.
+ break
+ }
+
+ s.cur += w.n
+ s.waiters.Remove(next)
+ close(w.ready)
+ }
+ s.mu.Unlock()
+}
diff --git a/semaphore/semaphore_bench_test.go b/semaphore/semaphore_bench_test.go
index f96d349..56aed26 100644
--- a/semaphore/semaphore_bench_test.go
+++ b/semaphore/semaphore_bench_test.go
@@ -20,6 +20,7 @@
Acquire(context.Context, int64) error
TryAcquire(int64) bool
Release(int64)
+ Resize(int64)
}
// semChan implements Weighted using a channel for
@@ -54,6 +55,10 @@
}
}
+func (s semChan) Resize(n int64) {
+ panic("not implemented mock method")
+}
+
// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times.
func acquireN(b *testing.B, sem weighted, size int64, N int) {
b.ResetTimer()
diff --git a/semaphore/semaphore_example_test.go b/semaphore/semaphore_example_test.go
index e75cd79..717de15 100644
--- a/semaphore/semaphore_example_test.go
+++ b/semaphore/semaphore_example_test.go
@@ -7,10 +7,9 @@
import (
"context"
"fmt"
+ "golang.org/x/sync/semaphore"
"log"
"runtime"
-
- "golang.org/x/sync/semaphore"
)
// Example_workerPool demonstrates how to use a semaphore to limit the number of
diff --git a/semaphore/semaphore_test.go b/semaphore/semaphore_test.go
index b5f8f13..2c82cc2 100644
--- a/semaphore/semaphore_test.go
+++ b/semaphore/semaphore_test.go
@@ -169,3 +169,135 @@
sem.Release(n)
wg.Wait()
}
+
+func TestWeightedResizePanic(t *testing.T) {
+ t.Parallel()
+ defer func() {
+ if recover() == nil {
+ t.Fatal("release of an unacquired weighted semaphore did not panic")
+ }
+ }()
+ w := semaphore.NewWeighted(1)
+ w.Resize(-1)
+}
+
+func TestWeightedResize(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ sem := semaphore.NewWeighted(3)
+ tryAcquire := func(n int64) bool {
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
+ defer cancel()
+ return sem.Acquire(ctx, n) == nil
+ }
+
+ tries := []bool{}
+
+ tries = append(tries, tryAcquire(1)) // true; cur/size = 1/3
+ tries = append(tries, tryAcquire(1)) // true; cur/size = 2/3
+ tries = append(tries, tryAcquire(1)) // true; cur/size = 3/3
+ tries = append(tries, tryAcquire(1)) // false; cur/size = 3/3 - full!
+
+ sem.Resize(2) // cur/size = 3/2
+
+ tries = append(tries, tryAcquire(1)) // false; cur/size = 3/3 - full!
+
+ sem.Release(1) // cur/size = 2/2
+
+ tries = append(tries, tryAcquire(1)) // false; cur/size = 2/2 - full!
+
+ sem.Release(1) // cur/size = 1/2
+
+ tries = append(tries, tryAcquire(1)) // true; cur/size = 2/2
+
+ tries = append(tries, tryAcquire(1)) // false; cur/size = 2/2 - full!
+
+ sem.Resize(3) // cur/size = 2/3
+
+ tries = append(tries, tryAcquire(1)) // true; cur/size = 3/3
+
+ tries = append(tries, tryAcquire(1)) // false; cur/size = 3/3 - full!
+
+ want := []bool{true, true, true, false, false, false, true, false, true, false}
+ for i := range tries {
+ if tries[i] != want[i] {
+ t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
+ }
+ }
+}
+
+// TestWeightedResizeUnblockImpossible will fail if an acquire that was blocking because of an impossible weight was
+// still blocking after resize to bigger size OR if an acquire that was possible but isn't possible after resize is
+// blocking because it is still in waiters list.
+// Merely returning from the test function indicates success.
+func TestWeightedResizeUnblockImpossible(t *testing.T) {
+ t.Parallel()
+ ctx := context.Background()
+ sem := semaphore.NewWeighted(3)
+
+ acquire := func(n int64, syncChan chan struct{}) chan struct{} {
+ signal := make(chan struct{})
+ go func() {
+ syncChan <- struct{}{}
+ sem.Acquire(ctx, n)
+ defer sem.Release(n)
+ signal <- struct{}{}
+ close(signal)
+ }()
+ return signal
+ }
+
+ // Use this syncChan to make sure acquire() get blocked in sequence (to have internal linked-list in desired state)
+ syncChan := make(chan struct{})
+
+ doneAcquire5 := acquire(5, syncChan)
+ <-syncChan
+ doneAcquire4 := acquire(4, syncChan)
+ <-syncChan
+
+ select {
+ case <-doneAcquire5:
+ t.Errorf("An Impossible acquire was aqcuired")
+ case <-doneAcquire4:
+ t.Errorf("An Impossible acquire was aqcuired")
+ default:
+
+ }
+
+ sem.Resize(4)
+
+ select {
+ case <-doneAcquire5:
+ t.Errorf("An Impossible acquire was aqcuired")
+ case <-doneAcquire4:
+
+ }
+
+ sem.Resize(5)
+
+ select {
+ case <-doneAcquire5:
+
+ }
+
+ sem.Acquire(ctx, 1)
+
+ // Test down-sizing the semaphore with to-be-impossible waiters in waiting.
+ syncChan = make(chan struct{})
+ doneAcquire5 = acquire(5, syncChan)
+ <-syncChan
+ doneAcquire4 = acquire(4, syncChan)
+ <-syncChan
+
+ sem.Resize(4)
+
+ sem.Release(1)
+
+ select {
+ case <-doneAcquire5:
+ t.Errorf("An Impossible acquire was aqcuired")
+ case <-doneAcquire4:
+
+ }
+
+}
To view, visit change 157680. To unsubscribe, or for help writing mail filters, visit settings.
Gerrit Bot uploaded patch set #2 to this change.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g worker-pool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Github Issue: https://github.com/golang/go/issues/29721
#### Edit:
I think I made a small mistake, I already posted the CR on Gerrit, I submitted this PR just for visibility and assumed from the repo's other PR(s) that the bot won't import it to Gerrit.
The code is identical, so I think abandoning any of the duplicate changes is fine.
Original Gerrit Review: https://go-review.googlesource.com/c/sync/+/157718
Change-Id: I8c58b2ecfa98a39d91a3c61118a342913890a72f
GitHub-Last-Rev: b0fff298b718af3950647c2fff43b23a81f52d3a
GitHub-Pull-Request: golang/sync#3
---
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
4 files changed, 222 insertions(+), 12 deletions(-)
To view, visit change 157680. To unsubscribe, or for help writing mail filters, visit settings.
Sherif Abdel-Naby uploaded patch set #2 to this change.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g workerpool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Github Issue: https://github.com/golang/go/issues/29721
Change-Id: Ifc096bee36a36f4c09f9c072de534f5de759d1df
---
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
4 files changed, 222 insertions(+), 12 deletions(-)
To view, visit change 157718. To unsubscribe, or for help writing mail filters, visit settings.
Ops, I think I made a mistake (excuse me first time to use Gerrit)
I submitted a PR on Github just for visibility, I thought it won't be imported here by the bot as the previous PR(s) history on the repo said otherwise (The bot said that it won't import PRs from this repo), now there are two duplicates of the same change. the code is exactly the same so I think we can simply abandon any of them. Excuse my rookie mistake, Thanks.
Sherif Abdel-Naby has uploaded this change for review.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g workerpool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Change-Id: Ifc096bee36a36f4c09f9c072de534f5de759d1df
---
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
4 files changed, 222 insertions(+), 12 deletions(-)
To view, visit change 157718. To unsubscribe, or for help writing mail filters, visit settings.
This needs to be discussed on the linked issue.
There are a lot of subtleties in the safe use of semaphores: it's not obvious to me that resizing is actually safe in general, and even if it is, we should come to agreement on the high-level API before reviewing the low-level details.
Patch set 2:Code-Review -2
Gerrit Bot uploaded patch set #3 to this change.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g worker-pool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Github Issue: https://github.com/golang/go/issues/29721
#### Edit:
I think I made a small mistake, I already posted the CR on Gerrit, I submitted this PR just for visibility and assumed from the repo's other PR(s) that the bot won't import it to Gerrit.
The code is identical, so I think abandoning any of the duplicate changes is fine.
Original Gerrit Review: https://go-review.googlesource.com/c/sync/+/157718
Change-Id: I8c58b2ecfa98a39d91a3c61118a342913890a72f
GitHub-Last-Rev: d5e48378160bf49067979cb4d8f840f39df6f033
GitHub-Pull-Request: golang/sync#3
---
M README.md
M errgroup/errgroup_test.go
A go.mod
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
M singleflight/singleflight.go
M singleflight/singleflight_test.go
A syncmap/go19.go
M syncmap/map.go
A syncmap/pre_go19.go
12 files changed, 1,071 insertions(+), 384 deletions(-)
To view, visit change 157680. To unsubscribe, or for help writing mail filters, visit settings.
Gerrit Bot uploaded patch set #4 to this change.
semaphore: make semaphore resizable
Semaphores are often used to bound concurrency (e.g worker-pool) and it is sometimes preferred to be able to resize the semaphore/workerpool to bound concurrency dynamically in respond to load.
This implementation is more performant than channel-based semaphore (channel based semaphore can't be resized as channel buffer is immutable),There is another implementations of a non-channel-based semaphores that supports resizing, but I found many bugs/deadlocks using them and they're all less performant than x/sync/semaphore.
The current implementation of x/sync/semaphore can be easily extended to be resizable, without affecting performance by any means.
Github Issue: https://github.com/golang/go/issues/29721
#### Edit:
I think I made a small mistake, I already posted the CR on Gerrit, I submitted this PR just for visibility and assumed from the repo's other PR(s) that the bot won't import it to Gerrit.
The code is identical, so I think abandoning any of the duplicate changes is fine.
Original Gerrit Review: https://go-review.googlesource.com/c/sync/+/157718
Change-Id: I8c58b2ecfa98a39d91a3c61118a342913890a72f
GitHub-Last-Rev: 7c9f24f79d4fc66180ba1fd165c299cba13f547f
GitHub-Pull-Request: golang/sync#3
---
M semaphore/semaphore.go
M semaphore/semaphore_bench_test.go
M semaphore/semaphore_example_test.go
M semaphore/semaphore_test.go
4 files changed, 226 insertions(+), 11 deletions(-)
To view, visit change 157680. To unsubscribe, or for help writing mail filters, visit settings.
Ian Lance Taylor abandoned this change.
To view, visit change 157680. To unsubscribe, or for help writing mail filters, visit settings.