Worker Pool Patterns
Advanced worker pool implementations for optimal resource utilization and performance in Go applications. This guide covers sophisticated worker pool patterns, from basic implementations to enterprise-grade solutions.
Table of Contents
- Introduction
- Basic Worker Pool
- Dynamic Scaling Pools
- Priority-Based Pools
- Specialized Pool Types
- Pool Management
- Performance Optimization
- Monitoring and Metrics
- Best Practices
Introduction
Worker pools manage a fixed number of goroutines to process work items efficiently, providing controlled resource usage and optimal throughput for concurrent applications.
Core Components
package main
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
)
// WorkerPool defines the interface for worker pool implementations
type WorkerPool interface {
Submit(work Work) error
SubmitWithTimeout(work Work, timeout time.Duration) error
Close() error
GetMetrics() PoolMetrics
Resize(newSize int) error
}
// Work represents a unit of work to be processed
type Work struct {
ID string
Task func() error
Priority int
Deadline time.Time
Context context.Context
Result chan error
}
// PoolMetrics provides comprehensive metrics for worker pools
type PoolMetrics struct {
PoolSize int32
ActiveWorkers int32
QueueSize int32
QueueCapacity int32
TasksSubmitted int64
TasksCompleted int64
TasksFailed int64
AverageLatency time.Duration
ThroughputPerSec float64
QueueUtilization float64
WorkerUtilization float64
}
// WorkerPoolConfig contains configuration for worker pools
type WorkerPoolConfig struct {
MinWorkers int
MaxWorkers int
QueueSize int
IdleTimeout time.Duration
MaxTaskDuration time.Duration
EnableMetrics bool
EnableProfiling bool
}
Basic Worker Pool
Implementation of a fundamental worker pool with essential features.
Simple Worker Pool
// SimpleWorkerPool implements a basic worker pool
type SimpleWorkerPool struct {
workers []*Worker
workQueue chan Work
quit chan struct{}
wg sync.WaitGroup
config WorkerPoolConfig
metrics *PoolMetrics
state int32 // 0: stopped, 1: running, 2: closing
}
// Worker represents a single worker goroutine
type Worker struct {
id int
workQueue chan Work
quit chan struct{}
pool *SimpleWorkerPool
currentWork *Work
startTime time.Time
tasksDone int64
}
// NewSimpleWorkerPool creates a new simple worker pool
func NewSimpleWorkerPool(config WorkerPoolConfig) *SimpleWorkerPool {
pool := &SimpleWorkerPool{
workQueue: make(chan Work, config.QueueSize),
quit: make(chan struct{}),
config: config,
metrics: &PoolMetrics{QueueCapacity: int32(config.QueueSize)},
}
// Create workers
pool.workers = make([]*Worker, config.MinWorkers)
for i := 0; i < config.MinWorkers; i++ {
pool.workers[i] = &Worker{
id: i,
workQueue: pool.workQueue,
quit: pool.quit,
pool: pool,
startTime: time.Now(),
}
}
atomic.StoreInt32(&pool.metrics.PoolSize, int32(config.MinWorkers))
return pool
}
// Start starts the worker pool
func (swp *SimpleWorkerPool) Start() error {
if !atomic.CompareAndSwapInt32(&swp.state, 0, 1) {
return fmt.Errorf("pool already running")
}
// Start all workers
for _, worker := range swp.workers {
swp.wg.Add(1)
go worker.run()
}
// Start metrics collector
if swp.config.EnableMetrics {
go swp.metricsCollector()
}
return nil
}
// Submit submits work to the pool
func (swp *SimpleWorkerPool) Submit(work Work) error {
if atomic.LoadInt32(&swp.state) != 1 {
return fmt.Errorf("pool not running")
}
select {
case swp.workQueue <- work:
atomic.AddInt64(&swp.metrics.TasksSubmitted, 1)
atomic.StoreInt32(&swp.metrics.QueueSize, int32(len(swp.workQueue)))
return nil
default:
return fmt.Errorf("work queue full")
}
}
// run executes the worker main loop
func (w *Worker) run() {
defer w.pool.wg.Done()
for {
select {
case work := <-w.workQueue:
w.processWork(work)
case <-w.quit:
return
}
}
}
// processWork processes a single work item
func (w *Worker) processWork(work Work) {
atomic.AddInt32(&w.pool.metrics.ActiveWorkers, 1)
defer atomic.AddInt32(&w.pool.metrics.ActiveWorkers, -1)
w.currentWork = &work
startTime := time.Now()
// Execute work with timeout protection
err := w.executeWithTimeout(work)
duration := time.Since(startTime)
w.updateMetrics(err, duration)
w.currentWork = nil
atomic.AddInt64(&w.tasksDone, 1)
// Send result if channel provided
if work.Result != nil {
select {
case work.Result <- err:
default:
// Non-blocking send
}
}
}
// executeWithTimeout executes work with timeout protection
func (w *Worker) executeWithTimeout(work Work) error {
done := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
done <- fmt.Errorf("panic: %v", r)
}
}()
done <- work.Task()
}()
timeout := w.pool.config.MaxTaskDuration
if timeout == 0 {
timeout = 30 * time.Second
}
select {
case err := <-done:
return err
case <-time.After(timeout):
return fmt.Errorf("task timeout")
case <-work.Context.Done():
return work.Context.Err()
}
}
// Close gracefully shuts down the worker pool
func (swp *SimpleWorkerPool) Close() error {
if !atomic.CompareAndSwapInt32(&swp.state, 1, 2) {
return fmt.Errorf("pool not running")
}
close(swp.workQueue)
swp.wg.Wait()
close(swp.quit)
atomic.StoreInt32(&swp.state, 0)
return nil
}
// GetMetrics returns current pool metrics
func (swp *SimpleWorkerPool) GetMetrics() PoolMetrics {
metrics := *swp.metrics
metrics.QueueSize = int32(len(swp.workQueue))
metrics.QueueUtilization = float64(metrics.QueueSize) / float64(swp.config.QueueSize)
return metrics
}
// Resize changes the pool size (not implemented in simple pool)
func (swp *SimpleWorkerPool) Resize(newSize int) error {
return fmt.Errorf("resize not supported in simple worker pool")
}
// SubmitWithTimeout submits work with timeout
func (swp *SimpleWorkerPool) SubmitWithTimeout(work Work, timeout time.Duration) error {
if atomic.LoadInt32(&swp.state) != 1 {
return fmt.Errorf("pool not running")
}
select {
case swp.workQueue <- work:
atomic.AddInt64(&swp.metrics.TasksSubmitted, 1)
return nil
case <-time.After(timeout):
return fmt.Errorf("submit timeout")
}
}
// updateMetrics updates worker and pool metrics
func (w *Worker) updateMetrics(err error, duration time.Duration) {
if err != nil {
atomic.AddInt64(&w.pool.metrics.TasksFailed, 1)
} else {
atomic.AddInt64(&w.pool.metrics.TasksCompleted, 1)
}
// Update average latency
currentAvg := w.pool.metrics.AverageLatency
newAvg := time.Duration((currentAvg.Nanoseconds() + duration.Nanoseconds()) / 2)
w.pool.metrics.AverageLatency = newAvg
}
// metricsCollector periodically updates metrics
func (swp *SimpleWorkerPool) metricsCollector() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
lastCompleted := int64(0)
for {
select {
case <-ticker.C:
completed := atomic.LoadInt64(&swp.metrics.TasksCompleted)
throughput := float64(completed - lastCompleted)
swp.metrics.ThroughputPerSec = throughput
lastCompleted = completed
case <-swp.quit:
return
}
}
}
Priority-Based Pools
Worker pools that handle tasks based on priority levels.
Priority Queue Pool
// PriorityWorkerPool implements a priority-based worker pool
type PriorityWorkerPool struct {
priorityQueues map[int]chan Work // Priority level -> queue
workers []*PriorityWorker
quit chan struct{}
wg sync.WaitGroup
config WorkerPoolConfig
metrics *PriorityPoolMetrics
state int32
scheduler *PriorityScheduler
}
// PriorityPoolMetrics extends PoolMetrics with priority-specific metrics
type PriorityPoolMetrics struct {
PoolMetrics
QueueSizesByPriority map[int]int32
TasksByPriority map[int]int64
LatencyByPriority map[int]time.Duration
StarvationEvents int64
PriorityInversions int64
}
// PriorityWorker handles work items based on priority
type PriorityWorker struct {
id int
pool *PriorityWorkerPool
currentWork *Work
tasksDone int64
lastWorkTime time.Time
}
// PriorityScheduler manages priority-based work distribution
type PriorityScheduler struct {
pool *PriorityWorkerPool
starvationThreshold time.Duration
lastScheduleTime map[int]time.Time
priorityWeights map[int]float64
}
// NewPriorityWorkerPool creates a new priority-based worker pool
func NewPriorityWorkerPool(config WorkerPoolConfig, priorities []int) *PriorityWorkerPool {
pool := &PriorityWorkerPool{
priorityQueues: make(map[int]chan Work),
quit: make(chan struct{}),
config: config,
metrics: &PriorityPoolMetrics{
PoolMetrics: PoolMetrics{
QueueCapacity: int32(config.QueueSize),
},
QueueSizesByPriority: make(map[int]int32),
TasksByPriority: make(map[int]int64),
LatencyByPriority: make(map[int]time.Duration),
},
}
// Create priority queues
queueSizePerPriority := config.QueueSize / len(priorities)
for _, priority := range priorities {
pool.priorityQueues[priority] = make(chan Work, queueSizePerPriority)
pool.metrics.QueueSizesByPriority[priority] = 0
pool.metrics.TasksByPriority[priority] = 0
pool.metrics.LatencyByPriority[priority] = 0
}
// Create workers
pool.workers = make([]*PriorityWorker, config.MinWorkers)
for i := 0; i < config.MinWorkers; i++ {
pool.workers[i] = &PriorityWorker{
id: i,
pool: pool,
}
}
// Create scheduler
pool.scheduler = &PriorityScheduler{
pool: pool,
starvationThreshold: 5 * time.Second,
lastScheduleTime: make(map[int]time.Time),
priorityWeights: make(map[int]float64),
}
// Initialize priority weights (higher priority = higher weight)
for _, priority := range priorities {
pool.scheduler.priorityWeights[priority] = float64(priority + 10) // Avoid zero weights
}
atomic.StoreInt32(&pool.metrics.PoolSize, int32(config.MinWorkers))
return pool
}
// Start starts the priority worker pool
func (pwp *PriorityWorkerPool) Start() error {
if !atomic.CompareAndSwapInt32(&pwp.state, 0, 1) {
return fmt.Errorf("pool already running")
}
// Start workers
for _, worker := range pwp.workers {
pwp.wg.Add(1)
go worker.run()
}
// Start metrics collector
if pwp.config.EnableMetrics {
go pwp.metricsCollector()
}
return nil
}
// Submit submits work to the appropriate priority queue
func (pwp *PriorityWorkerPool) Submit(work Work) error {
if atomic.LoadInt32(&pwp.state) != 1 {
return fmt.Errorf("pool not running")
}
queue, exists := pwp.priorityQueues[work.Priority]
if !exists {
return fmt.Errorf("priority %d not supported", work.Priority)
}
select {
case queue <- work:
atomic.AddInt64(&pwp.metrics.TasksSubmitted, 1)
atomic.AddInt64(&pwp.metrics.TasksByPriority[work.Priority], 1)
pwp.updateQueueMetrics()
return nil
default:
return fmt.Errorf("priority queue %d full", work.Priority)
}
}
// SubmitWithTimeout submits work with timeout
func (pwp *PriorityWorkerPool) SubmitWithTimeout(work Work, timeout time.Duration) error {
if atomic.LoadInt32(&pwp.state) != 1 {
return fmt.Errorf("pool not running")
}
queue, exists := pwp.priorityQueues[work.Priority]
if !exists {
return fmt.Errorf("priority %d not supported", work.Priority)
}
select {
case queue <- work:
atomic.AddInt64(&pwp.metrics.TasksSubmitted, 1)
atomic.AddInt64(&pwp.metrics.TasksByPriority[work.Priority], 1)
pwp.updateQueueMetrics()
return nil
case <-time.After(timeout):
return fmt.Errorf("submit timeout for priority %d", work.Priority)
}
}
// run executes the priority worker main loop
func (pw *PriorityWorker) run() {
defer pw.pool.wg.Done()
for {
select {
case <-pw.pool.quit:
return
default:
work := pw.pool.scheduler.getNextWork()
if work != nil {
pw.processWork(*work)
} else {
// No work available, yield
runtime.Gosched()
}
}
}
}
// getNextWork selects the next work item based on priority scheduling
func (ps *PriorityScheduler) getNextWork() *Work {
// Check for starvation first
if work := ps.checkStarvation(); work != nil {
return work
}
// Normal priority-based selection
priorities := ps.getSortedPriorities()
for _, priority := range priorities {
queue := ps.pool.priorityQueues[priority]
select {
case work := <-queue:
ps.lastScheduleTime[priority] = time.Now()
return &work
default:
continue
}
}
return nil
}
// checkStarvation checks for and handles priority starvation
func (ps *PriorityScheduler) checkStarvation() *Work {
now := time.Now()
for priority, queue := range ps.pool.priorityQueues {
lastScheduled, exists := ps.lastScheduleTime[priority]
if !exists {
lastScheduled = now
ps.lastScheduleTime[priority] = now
}
// Check if this priority has been starved
if now.Sub(lastScheduled) > ps.starvationThreshold && len(queue) > 0 {
select {
case work := <-queue:
atomic.AddInt64(&ps.pool.metrics.StarvationEvents, 1)
ps.lastScheduleTime[priority] = now
return &work
default:
}
}
}
return nil
}
// getSortedPriorities returns priorities sorted by weight (highest first)
func (ps *PriorityScheduler) getSortedPriorities() []int {
type priorityWeight struct {
priority int
weight float64
queueLen int
}
weights := make([]priorityWeight, 0, len(ps.priorityWeights))
for priority, weight := range ps.priorityWeights {
queueLen := len(ps.pool.priorityQueues[priority])
if queueLen > 0 { // Only consider non-empty queues
weights = append(weights, priorityWeight{
priority: priority,
weight: weight,
queueLen: queueLen,
})
}
}
// Sort by weight (descending)
for i := 0; i < len(weights)-1; i++ {
for j := i + 1; j < len(weights); j++ {
if weights[i].weight < weights[j].weight {
weights[i], weights[j] = weights[j], weights[i]
}
}
}
priorities := make([]int, len(weights))
for i, pw := range weights {
priorities[i] = pw.priority
}
return priorities
}
// processWork processes a work item with priority tracking
func (pw *PriorityWorker) processWork(work Work) {
atomic.AddInt32(&pw.pool.metrics.ActiveWorkers, 1)
defer atomic.AddInt32(&pw.pool.metrics.ActiveWorkers, -1)
pw.currentWork = &work
startTime := time.Now()
// Execute work
err := pw.executeWork(work)
duration := time.Since(startTime)
pw.updatePriorityMetrics(work.Priority, err, duration)
pw.currentWork = nil
pw.lastWorkTime = time.Now()
atomic.AddInt64(&pw.tasksDone, 1)
// Send result
if work.Result != nil {
select {
case work.Result <- err:
default:
}
}
}
// executeWork executes a work item
func (pw *PriorityWorker) executeWork(work Work) error {
defer func() {
if r := recover(); r != nil {
// Handle panic
}
}()
return work.Task()
}
// updatePriorityMetrics updates metrics for specific priority
func (pw *PriorityWorker) updatePriorityMetrics(priority int, err error, duration time.Duration) {
if err != nil {
atomic.AddInt64(&pw.pool.metrics.TasksFailed, 1)
} else {
atomic.AddInt64(&pw.pool.metrics.TasksCompleted, 1)
}
// Update priority-specific latency
currentLatency := pw.pool.metrics.LatencyByPriority[priority]
newLatency := time.Duration((currentLatency.Nanoseconds() + duration.Nanoseconds()) / 2)
pw.pool.metrics.LatencyByPriority[priority] = newLatency
}
// updateQueueMetrics updates queue size metrics
func (pwp *PriorityWorkerPool) updateQueueMetrics() {
totalQueueSize := int32(0)
for priority, queue := range pwp.priorityQueues {
queueSize := int32(len(queue))
pwp.metrics.QueueSizesByPriority[priority] = queueSize
totalQueueSize += queueSize
}
atomic.StoreInt32(&pwp.metrics.QueueSize, totalQueueSize)
}
// Close gracefully shuts down the priority pool
func (pwp *PriorityWorkerPool) Close() error {
if !atomic.CompareAndSwapInt32(&pwp.state, 1, 2) {
return fmt.Errorf("pool not running")
}
// Close all priority queues
for _, queue := range pwp.priorityQueues {
close(queue)
}
close(pwp.quit)
pwp.wg.Wait()
atomic.StoreInt32(&pwp.state, 0)
return nil
}
// GetMetrics returns current pool metrics
func (pwp *PriorityWorkerPool) GetMetrics() PoolMetrics {
pwp.updateQueueMetrics()
metrics := pwp.metrics.PoolMetrics
metrics.PoolSize = int32(len(pwp.workers))
if metrics.QueueCapacity > 0 {
metrics.QueueUtilization = float64(metrics.QueueSize) / float64(metrics.QueueCapacity)
}
return metrics
}
// Resize changes the pool size
func (pwp *PriorityWorkerPool) Resize(newSize int) error {
return fmt.Errorf("resize not supported in priority worker pool")
}
// metricsCollector collects metrics for priority pool
func (pwp *PriorityWorkerPool) metricsCollector() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
lastCompleted := int64(0)
for {
select {
case <-ticker.C:
completed := atomic.LoadInt64(&pwp.metrics.TasksCompleted)
throughput := float64(completed - lastCompleted)
pwp.metrics.ThroughputPerSec = throughput
lastCompleted = completed
pwp.updateQueueMetrics()
case <-pwp.quit:
return
}
}
}
Specialized Pool Types
Specialized worker pool implementations for specific use cases.
Batch Processing Pool
// BatchWorkerPool processes work items in batches for efficiency
type BatchWorkerPool struct {
batchSize int
flushInterval time.Duration
workers []*BatchWorker
workQueue chan Work
quit chan struct{}
wg sync.WaitGroup
config WorkerPoolConfig
metrics *PoolMetrics
state int32
}
// BatchWorker processes work items in batches
type BatchWorker struct {
id int
pool *BatchWorkerPool
batch []Work
lastFlush time.Time
batchProcessor func([]Work) error
}
// NewBatchWorkerPool creates a new batch processing pool
func NewBatchWorkerPool(config WorkerPoolConfig, batchSize int, flushInterval time.Duration) *BatchWorkerPool {
pool := &BatchWorkerPool{
batchSize: batchSize,
flushInterval: flushInterval,
workQueue: make(chan Work, config.QueueSize),
quit: make(chan struct{}),
config: config,
metrics: &PoolMetrics{QueueCapacity: int32(config.QueueSize)},
}
// Create workers
pool.workers = make([]*BatchWorker, config.MinWorkers)
for i := 0; i < config.MinWorkers; i++ {
pool.workers[i] = &BatchWorker{
id: i,
pool: pool,
batch: make([]Work, 0, batchSize),
lastFlush: time.Now(),
}
}
return pool
}
// Start starts the batch worker pool
func (bwp *BatchWorkerPool) Start() error {
if !atomic.CompareAndSwapInt32(&bwp.state, 0, 1) {
return fmt.Errorf("pool already running")
}
for _, worker := range bwp.workers {
bwp.wg.Add(1)
go worker.run()
}
return nil
}
// run executes the batch worker main loop
func (bw *BatchWorker) run() {
defer bw.pool.wg.Done()
flushTicker := time.NewTicker(bw.pool.flushInterval)
defer flushTicker.Stop()
for {
select {
case work := <-bw.pool.workQueue:
bw.addToBatch(work)
if len(bw.batch) >= bw.pool.batchSize {
bw.flushBatch()
}
case <-flushTicker.C:
if len(bw.batch) > 0 {
bw.flushBatch()
}
case <-bw.pool.quit:
// Flush remaining work before stopping
if len(bw.batch) > 0 {
bw.flushBatch()
}
return
}
}
}
// addToBatch adds a work item to the current batch
func (bw *BatchWorker) addToBatch(work Work) {
bw.batch = append(bw.batch, work)
}
// flushBatch processes the current batch
func (bw *BatchWorker) flushBatch() {
if len(bw.batch) == 0 {
return
}
atomic.AddInt32(&bw.pool.metrics.ActiveWorkers, 1)
defer atomic.AddInt32(&bw.pool.metrics.ActiveWorkers, -1)
startTime := time.Now()
// Process batch
var err error
if bw.batchProcessor != nil {
err = bw.batchProcessor(bw.batch)
} else {
err = bw.defaultBatchProcessor(bw.batch)
}
duration := time.Since(startTime)
bw.updateBatchMetrics(len(bw.batch), err, duration)
// Clear batch
bw.batch = bw.batch[:0]
bw.lastFlush = time.Now()
}
// defaultBatchProcessor processes batch items individually
func (bw *BatchWorker) defaultBatchProcessor(batch []Work) error {
var lastError error
for _, work := range batch {
if err := work.Task(); err != nil {
lastError = err
// Send individual result
if work.Result != nil {
select {
case work.Result <- err:
default:
}
}
} else {
if work.Result != nil {
select {
case work.Result <- nil:
default:
}
}
}
}
return lastError
}
// updateBatchMetrics updates metrics after batch processing
func (bw *BatchWorker) updateBatchMetrics(batchSize int, err error, duration time.Duration) {
if err != nil {
atomic.AddInt64(&bw.pool.metrics.TasksFailed, int64(batchSize))
} else {
atomic.AddInt64(&bw.pool.metrics.TasksCompleted, int64(batchSize))
}
// Update average latency
avgDuration := duration / time.Duration(batchSize)
currentAvg := bw.pool.metrics.AverageLatency
newAvg := time.Duration((currentAvg.Nanoseconds() + avgDuration.Nanoseconds()) / 2)
bw.pool.metrics.AverageLatency = newAvg
}
// Submit submits work to the batch pool
func (bwp *BatchWorkerPool) Submit(work Work) error {
if atomic.LoadInt32(&bwp.state) != 1 {
return fmt.Errorf("pool not running")
}
select {
case bwp.workQueue <- work:
atomic.AddInt64(&bwp.metrics.TasksSubmitted, 1)
return nil
default:
return fmt.Errorf("work queue full")
}
}
// SubmitWithTimeout submits work with timeout
func (bwp *BatchWorkerPool) SubmitWithTimeout(work Work, timeout time.Duration) error {
if atomic.LoadInt32(&bwp.state) != 1 {
return fmt.Errorf("pool not running")
}
select {
case bwp.workQueue <- work:
atomic.AddInt64(&bwp.metrics.TasksSubmitted, 1)
return nil
case <-time.After(timeout):
return fmt.Errorf("submit timeout")
}
}
// Close gracefully shuts down the batch pool
func (bwp *BatchWorkerPool) Close() error {
if !atomic.CompareAndSwapInt32(&bwp.state, 1, 2) {
return fmt.Errorf("pool not running")
}
close(bwp.workQueue)
bwp.wg.Wait()
close(bwp.quit)
atomic.StoreInt32(&bwp.state, 0)
return nil
}
// GetMetrics returns current batch pool metrics
func (bwp *BatchWorkerPool) GetMetrics() PoolMetrics {
metrics := *bwp.metrics
metrics.QueueSize = int32(len(bwp.workQueue))
metrics.PoolSize = int32(len(bwp.workers))
metrics.QueueUtilization = float64(metrics.QueueSize) / float64(bwp.config.QueueSize)
return metrics
}
// Resize changes the batch pool size
func (bwp *BatchWorkerPool) Resize(newSize int) error {
return fmt.Errorf("resize not supported in batch worker pool")
}
Load Balancing
Load balancing strategies for distributing work across pools.
Load Balancer Implementation
// LoadBalancer defines interface for load balancing strategies
type LoadBalancer interface {
SelectPool(pools []WorkerPool) WorkerPool
UpdateMetrics(pool WorkerPool, metrics PoolMetrics)
}
// RoundRobinBalancer implements round-robin load balancing
type RoundRobinBalancer struct {
current int64
}
// NewRoundRobinBalancer creates a new round-robin balancer
func NewRoundRobinBalancer() *RoundRobinBalancer {
return &RoundRobinBalancer{}
}
// SelectPool selects a pool using round-robin strategy
func (rrb *RoundRobinBalancer) SelectPool(pools []WorkerPool) WorkerPool {
if len(pools) == 0 {
return nil
}
index := atomic.AddInt64(&rrb.current, 1) % int64(len(pools))
return pools[index]
}
// UpdateMetrics is not used in round-robin balancing
func (rrb *RoundRobinBalancer) UpdateMetrics(pool WorkerPool, metrics PoolMetrics) {
// No-op for round-robin
}
// WeightedBalancer implements weighted load balancing
type WeightedBalancer struct {
poolMetrics map[WorkerPool]PoolMetrics
weights map[WorkerPool]float64
mu sync.RWMutex
}
// NewWeightedBalancer creates a new weighted balancer
func NewWeightedBalancer() *WeightedBalancer {
return &WeightedBalancer{
poolMetrics: make(map[WorkerPool]PoolMetrics),
weights: make(map[WorkerPool]float64),
}
}
// SelectPool selects a pool based on weighted metrics
func (wb *WeightedBalancer) SelectPool(pools []WorkerPool) WorkerPool {
if len(pools) == 0 {
return nil
}
wb.mu.RLock()
defer wb.mu.RUnlock()
bestPool := pools[0]
bestScore := wb.calculateScore(bestPool)
for _, pool := range pools[1:] {
score := wb.calculateScore(pool)
if score > bestScore {
bestScore = score
bestPool = pool
}
}
return bestPool
}
// calculateScore calculates a score for pool selection
func (wb *WeightedBalancer) calculateScore(pool WorkerPool) float64 {
metrics, exists := wb.poolMetrics[pool]
if !exists {
return 0.5 // Default score for unknown pools
}
// Score based on queue utilization (lower is better)
queueScore := 1.0 - metrics.QueueUtilization
// Score based on worker utilization (balanced is better)
utilizationScore := 1.0 - abs(metrics.WorkerUtilization-0.7) // Target 70% utilization
// Score based on throughput (higher is better)
throughputScore := min(metrics.ThroughputPerSec/1000.0, 1.0)
// Weighted combination
return queueScore*0.4 + utilizationScore*0.3 + throughputScore*0.3
}
// UpdateMetrics updates metrics for load balancing decisions
func (wb *WeightedBalancer) UpdateMetrics(pool WorkerPool, metrics PoolMetrics) {
wb.mu.Lock()
defer wb.mu.Unlock()
wb.poolMetrics[pool] = metrics
}
// Helper functions
func abs(x float64) float64 {
if x < 0 {
return -x
}
return x
}
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
// PoolHealthChecker monitors pool health
type PoolHealthChecker struct {
pools map[string]WorkerPool
health map[string]HealthStatus
thresholds HealthThresholds
mu sync.RWMutex
}
// HealthStatus represents pool health status
type HealthStatus struct {
IsHealthy bool
LastHealthCheck time.Time
FailureCount int
ErrorRate float64
ResponseTime time.Duration
}
// HealthThresholds defines health check thresholds
type HealthThresholds struct {
MaxErrorRate float64
MaxResponseTime time.Duration
MaxFailureCount int
CheckInterval time.Duration
}
// NewPoolHealthChecker creates a new pool health checker
func NewPoolHealthChecker() *PoolHealthChecker {
return &PoolHealthChecker{
pools: make(map[string]WorkerPool),
health: make(map[string]HealthStatus),
thresholds: HealthThresholds{
MaxErrorRate: 0.1, // 10% error rate
MaxResponseTime: 5 * time.Second,
MaxFailureCount: 5,
CheckInterval: 30 * time.Second,
},
}
}
// AddPool adds a pool to health monitoring
func (phc *PoolHealthChecker) AddPool(name string, pool WorkerPool) {
phc.mu.Lock()
defer phc.mu.Unlock()
phc.pools[name] = pool
phc.health[name] = HealthStatus{
IsHealthy: true,
LastHealthCheck: time.Now(),
}
go phc.monitorPool(name)
}
// monitorPool continuously monitors a pool's health
func (phc *PoolHealthChecker) monitorPool(name string) {
ticker := time.NewTicker(phc.thresholds.CheckInterval)
defer ticker.Stop()
for range ticker.C {
phc.checkPoolHealth(name)
}
}
// checkPoolHealth performs health check on a pool
func (phc *PoolHealthChecker) checkPoolHealth(name string) {
phc.mu.Lock()
defer phc.mu.Unlock()
pool, exists := phc.pools[name]
if !exists {
return
}
metrics := pool.GetMetrics()
status := phc.health[name]
// Check error rate
errorRate := float64(metrics.TasksFailed) / float64(metrics.TasksSubmitted)
if errorRate > phc.thresholds.MaxErrorRate {
status.FailureCount++
status.IsHealthy = false
}
// Check response time
if metrics.AverageLatency > phc.thresholds.MaxResponseTime {
status.FailureCount++
status.IsHealthy = false
}
// Reset health if under thresholds
if errorRate <= phc.thresholds.MaxErrorRate &&
metrics.AverageLatency <= phc.thresholds.MaxResponseTime {
if status.FailureCount > 0 {
status.FailureCount--
}
if status.FailureCount == 0 {
status.IsHealthy = true
}
}
// Mark unhealthy if too many failures
if status.FailureCount >= phc.thresholds.MaxFailureCount {
status.IsHealthy = false
}
status.LastHealthCheck = time.Now()
status.ErrorRate = errorRate
status.ResponseTime = metrics.AverageLatency
phc.health[name] = status
}
// IsHealthy checks if a pool is healthy
func (phc *PoolHealthChecker) IsHealthy(name string) bool {
phc.mu.RLock()
defer phc.mu.RUnlock()
if status, exists := phc.health[name]; exists {
return status.IsHealthy
}
return false
}
// GetHealthStatus returns health status for a pool
func (phc *PoolHealthChecker) GetHealthStatus(name string) (HealthStatus, bool) {
phc.mu.RLock()
defer phc.mu.RUnlock()
status, exists := phc.health[name]
return status, exists
}
// AggregatedMetrics provides aggregated metrics across pools
type AggregatedMetrics struct {
totalPools int32
totalTasks int64
totalFailures int64
avgThroughput float64
avgLatency time.Duration
mu sync.RWMutex
}
// NewAggregatedMetrics creates new aggregated metrics
func NewAggregatedMetrics() *AggregatedMetrics {
return &AggregatedMetrics{}
}
// UpdateMetrics updates aggregated metrics
func (am *AggregatedMetrics) UpdateMetrics(metrics PoolMetrics) {
am.mu.Lock()
defer am.mu.Unlock()
am.totalTasks += metrics.TasksCompleted
am.totalFailures += metrics.TasksFailed
// Update averages (simplified)
am.avgThroughput = (am.avgThroughput + metrics.ThroughputPerSec) / 2
am.avgLatency = time.Duration((am.avgLatency.Nanoseconds() + metrics.AverageLatency.Nanoseconds()) / 2)
}
// GetAggregatedMetrics returns current aggregated metrics
func (am *AggregatedMetrics) GetAggregatedMetrics() (int64, int64, float64, time.Duration) {
am.mu.RLock()
defer am.mu.RUnlock()
return am.totalTasks, am.totalFailures, am.avgThroughput, am.avgLatency
}
Summary
Worker pool patterns provide essential building blocks for concurrent Go applications:
- Simple Pools: Fixed-size pools for predictable workloads
- Dynamic Pools: Auto-scaling pools for variable workloads
- Priority Pools: Priority-based task processing with starvation prevention
- Batch Pools: Efficient batch processing for related tasks
- Load Balancing: Intelligent work distribution across pools
- Health Monitoring: Continuous pool health assessment
Choose the appropriate pattern based on your specific requirements for performance, scalability, and resource utilization.