Worker pool using goroutines
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

127 lines
2.7 KiB

package pool
import (
"errors"
"fmt"
"runtime"
"sync/atomic"
"time"
)
var (
DefaultPoolSize = runtime.NumCPU()
DefaultPoolWorkers = runtime.NumCPU()
ErrPoolClosed = errors.New("broadcast: pool closed")
ErrScheduleTimeout = errors.New("broadcast: pool schedule timeout")
)
type Pool struct {
size int
sema chan struct{}
quit chan struct{}
work chan func()
closed uint32
}
// New initialises a new Pool with a backlog of size and a maximum number
// of workers worker Go routines. The workers spawn on-demand.
func New(size, workers int) *Pool {
if size <= 0 {
size = DefaultPoolSize
}
if workers <= 0 {
size = DefaultPoolWorkers
}
if workers > size {
workers = size
}
return &Pool{
size: size,
sema: make(chan struct{}, size),
quit: make(chan struct{}, size),
work: make(chan func(), workers),
}
}
// Close stops accepting new work and interrupts the workers as soon as they
// finish the work they are currently processing.
func (pool *Pool) Close() error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
atomic.AddUint32(&pool.closed, 1)
for i := 0; i < pool.size<<1; i++ {
pool.quit <- struct{}{}
}
return nil
}
// Spawn pre-spawns n workers. Calling this function more than once when the
// total of spawned workers exceed the queue size will block until work is
// freed from the schedule.
func (pool *Pool) Spawn(n int) error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
if n > pool.size {
return fmt.Errorf("broadcast: number of workers exceeds size of pool (%d > %d)", n, pool.size)
}
for i := 0; i < n; i++ {
pool.sema <- struct{}{}
go pool.worker(func() {})
}
return nil
}
func (pool *Pool) worker(work func()) {
defer func() {
<-pool.sema
}()
work()
for {
select {
case <-pool.quit:
return
case work := <-pool.work:
work()
}
}
}
// Schedule work. If no worker is available or if there is no room in the
// worker channel, this call will block.
func (pool *Pool) Schedule(work func()) error {
return pool.schedule(work, nil)
}
// ScheduleUntil schedules work with a timeout. If no worker is available or
// if there is no room within the specified timeout, an error is returned.
func (pool *Pool) ScheduleUntil(work func(), timeout time.Duration) error {
return pool.schedule(work, time.After(timeout))
}
func (pool *Pool) schedule(work func(), timeout <-chan time.Time) error {
if atomic.LoadUint32(&pool.closed) > 0 {
return ErrPoolClosed
}
select {
case <-timeout:
return ErrScheduleTimeout
case <-pool.quit:
return ErrPoolClosed
case pool.work <- work:
return nil
case pool.sema <- struct{}{}:
go pool.worker(work)
return nil
}
}