Browse Source

Initial import

master
maze 3 years ago
parent
commit
e496cbf971
2 changed files with 219 additions and 0 deletions
  1. +127
    -0
      pool.go
  2. +92
    -0
      pool_test.go

+ 127
- 0
pool.go View File

@ -0,0 +1,127 @@
package pool
import (
"errors"
"fmt"
"runtime"
"sync/atomic"
"time"
)
var (
DefaultPoolSize = runtime.NumCPU()
DefaultPoolWorkers = runtime.NumCPU()
ErrPoolClosed = errors.New("broadcast: pool closed")
ErrScheduleTimeout = errors.New("broadcast: pool schedule timeout")
)
type Pool struct {
size int
sema chan struct{}
quit chan struct{}
work chan func()
closed uint32
}
// NewPool initialises a new Pool with a backlog of size and a maximum number
// of workers worker Go routines. The workers spawn on-demand.
func NewPool(size, workers int) *Pool {
if size <= 0 {
size = DefaultPoolSize
}
if workers <= 0 {
size = DefaultPoolWorkers
}
if workers > size {
workers = size
}
return &Pool{
size: size,
sema: make(chan struct{}, size),
quit: make(chan struct{}, size),
work: make(chan func(), workers),
}
}
// Close stops accepting new work and interrupts the workers as soon as they
// finish the work they are currently processing.
func (pool *Pool) Close() error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
atomic.AddUint32(&pool.closed, 1)
for i := 0; i < pool.size<<1; i++ {
pool.quit <- struct{}{}
}
return nil
}
// Spawn pre-spawns n workers. Calling this function more than once when the
// total of spawned workers exceed the queue size will block until work is
// freed from the schedule.
func (pool *Pool) Spawn(n int) error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
if n > pool.size {
return fmt.Errorf("broadcast: number of workers exceeds size of pool (%d > %d)", n, pool.size)
}
for i := 0; i < n; i++ {
pool.sema <- struct{}{}
go pool.worker(func() {})
}
return nil
}
func (pool *Pool) worker(work func()) {
defer func() {
<-pool.sema
}()
work()
for {
select {
case <-pool.quit:
return
case work := <-pool.work:
work()
}
}
}
// Schedule work. If no worker is available or if there is no room in the
// worker channel, this call will block.
func (pool *Pool) Schedule(work func()) error {
return pool.schedule(work, nil)
}
// ScheduleUntil schedules work with a timeout. If no worker is available or
// if there is no room within the specified timeout, an error is returned.
func (pool *Pool) ScheduleUntil(work func(), timeout time.Duration) error {
return pool.schedule(work, time.After(timeout))
}
func (pool *Pool) schedule(work func(), timeout <-chan time.Time) error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
select {
case <-timeout:
return ErrScheduleTimeout
case <-pool.quit:
return ErrPoolClosed
case pool.work <- work:
return nil
case pool.sema <- struct{}{}:
go pool.worker(work)
return nil
}
}

+ 92
- 0
pool_test.go View File

@ -0,0 +1,92 @@
package pool
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestPool(t *testing.T) {
var (
a uint64
wait sync.WaitGroup
)
pool := NewPool(4, 4)
if err := pool.Spawn(4); err != nil {
t.Fatal(err)
}
for i := 0; i < 16; i++ {
wait.Add(1)
pool.Schedule(func() {
atomic.AddUint64(&a, 1)
wait.Done()
})
}
for i := 0; i < 16; i++ {
wait.Add(1)
if err := pool.ScheduleUntil(func() {
atomic.AddUint64(&a, 1)
wait.Done()
}, time.Second); err != nil {
t.Fatal(err)
}
}
wait.Wait()
if a != 32 {
t.Fatal("expected a=32, got %d", a)
}
if err := pool.Close(); err != nil {
t.Fatal(err)
}
}
func TestPoolError(t *testing.T) {
var (
a uint64
)
pool := NewPool(1, 1)
if err := pool.Spawn(2); err != nil {
t.Logf("spawn error (expected): %v", err)
} else {
t.Fatal("expected pool.Spawn(2) to error")
}
// One job gets picked up by first worker slot, other is queued in work channel
for i := 0; i < 2; i++ {
pool.Schedule(func() {
time.Sleep(time.Second)
})
}
// Now all following scheduling should fail (unless your box is really slow...)
for i := 0; i < 2; i++ {
if err := pool.ScheduleUntil(func() {
time.Sleep(time.Second)
atomic.AddUint64(&a, 1)
}, time.Nanosecond); err == nil {
t.Logf("%d: expected timeout (is your box that fast!?)", i)
} else if err != nil {
t.Logf("schedule error (expected): %v", err)
}
}
if err := pool.Close(); err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
if err := pool.Schedule(func() {}); err == nil {
t.Fatal("expected error scheduling work on closed Pool")
} else {
t.Logf("schedule error (expected): %v", err)
}
}
}

Loading…
Cancel
Save