Master Go's powerful concurrency features and patterns to build efficient, scalable applications. Learn how to effectively use goroutines, channels, and synchronization primitives. 🚀
Basic Concurrency Patterns 🎯
// Goroutines and WaitGroups
func main() {
var wg sync.WaitGroup
// Launch multiple goroutines
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
process(id)
}(i)
}
// Wait for all goroutines to complete
wg.Wait()
}
func process(id int) {
fmt.Printf("Processing %d\n", id)
time.Sleep(time.Second)
}
// Channel Basics
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// Pipeline
nums := generator(2, 3, 4, 5)
squares := square(nums)
// Consume results
for n := range squares {
fmt.Println(n)
}
}
Advanced Channel Patterns 🔄
// Fan-out Fan-in Pattern
func fanOut(in <-chan int, n int) []<-chan int {
channels := make([]<-chan int, n)
for i := 0; i < n; i++ {
channels[i] = worker(in)
}
return channels
}
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
multiplexed := make(chan int)
wg.Add(len(channels))
for _, c := range channels {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
multiplexed <- n
}
}(c)
}
go func() {
wg.Wait()
close(multiplexed)
}()
return multiplexed
}
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- process(n)
}
}()
return out
}
// Timeout Pattern
func timeoutPattern(timeout time.Duration) (string, error) {
ch := make(chan string, 1)
go func() {
// Simulate work
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case result := <-ch:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("operation timed out")
}
}
// Rate Limiting
type RateLimiter struct {
ticker *time.Ticker
done chan bool
}
func NewRateLimiter(rate time.Duration) *RateLimiter {
return &RateLimiter{
ticker: time.NewTicker(rate),
done: make(chan bool),
}
}
func (r *RateLimiter) Stop() {
r.ticker.Stop()
close(r.done)
}
func (r *RateLimiter) Process(work <-chan int) {
for {
select {
case <-r.done:
return
case <-r.ticker.C:
select {
case item := <-work:
process(item)
default:
// No work available
}
}
}
}
Synchronization Patterns 🔒
// Mutex Pattern
type Counter struct {
mu sync.Mutex
count int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
// RWMutex Pattern
type Cache struct {
sync.RWMutex
items map[string]string
}
func NewCache() *Cache {
return &Cache{
items: make(map[string]string),
}
}
func (c *Cache) Get(key string) (string, bool) {
c.RLock()
defer c.RUnlock()
item, exists := c.items[key]
return item, exists
}
func (c *Cache) Set(key, value string) {
c.Lock()
defer c.Unlock()
c.items[key] = value
}
// Once Pattern
type Config struct {
once sync.Once
settings map[string]string
}
func (c *Config) Load() error {
var err error
c.once.Do(func() {
c.settings, err = loadSettings()
})
return err
}
// Pool Pattern
type Worker struct {
id int
}
func NewWorkerPool() *sync.Pool {
var id int
return &sync.Pool{
New: func() interface{} {
id++
return &Worker{id: id}
},
}
}
Context Patterns 📝
// Context with Timeout
func processWithTimeout(
ctx context.Context,
data string,
) (string, error) {
ch := make(chan string, 1)
go func() {
result := processData(data)
ch <- result
}()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
// Context with Cancellation
func processBatch(
ctx context.Context,
items []string,
) error {
for _, item := range items {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := processItem(item); err != nil {
return err
}
}
}
return nil
}
// Context with Value
type key int
const userKey key = 0
func WithUser(
ctx context.Context,
user string,
) context.Context {
return context.WithValue(ctx, userKey, user)
}
func GetUser(ctx context.Context) string {
user, _ := ctx.Value(userKey).(string)
return user
}
Error Handling Patterns ⚠️
// Error Group Pattern
func processFiles(files []string) error {
g, ctx := errgroup.WithContext(context.Background())
for _, file := range files {
file := file // Create new variable for closure
g.Go(func() error {
return processFile(ctx, file)
})
}
return g.Wait()
}
// Circuit Breaker Pattern
type CircuitBreaker struct {
mu sync.RWMutex
failureCount int
threshold int
timeout time.Duration
lastFailure time.Time
}
func (cb *CircuitBreaker) Execute(
fn func() error,
) error {
if !cb.canExecute() {
return fmt.Errorf("circuit breaker is open")
}
err := fn()
cb.recordResult(err)
return err
}
func (cb *CircuitBreaker) canExecute() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
if cb.failureCount >= cb.threshold {
if time.Since(cb.lastFailure) > cb.timeout {
return true
}
return false
}
return true
}
func (cb *CircuitBreaker) recordResult(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failureCount++
cb.lastFailure = time.Now()
} else {
cb.failureCount = 0
}
}
Worker Pool Pattern 👥
// Worker Pool Implementation
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
Error error
}
type WorkerPool struct {
numWorkers int
jobs chan Job
results chan Result
done chan struct{}
}
func NewWorkerPool(
numWorkers int,
jobQueueSize int,
) *WorkerPool {
return &WorkerPool{
numWorkers: numWorkers,
jobs: make(chan Job, jobQueueSize),
results: make(chan Result, jobQueueSize),
done: make(chan struct{}),
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.numWorkers; i++ {
go p.worker(i)
}
}
func (p *WorkerPool) Stop() {
close(p.done)
}
func (p *WorkerPool) Submit(job Job) {
p.jobs <- job
}
func (p *WorkerPool) Results() <-chan Result {
return p.results
}
func (p *WorkerPool) worker(id int) {
for {
select {
case <-p.done:
return
case job := <-p.jobs:
result := Result{JobID: job.ID}
// Process job
output, err := processJob(job)
result.Output = output
result.Error = err
// Send result
p.results <- result
}
}
}
// Usage
func main() {
pool := NewWorkerPool(5, 100)
pool.Start()
defer pool.Stop()
// Submit jobs
go func() {
for i := 0; i < 20; i++ {
pool.Submit(Job{
ID: i,
Data: fmt.Sprintf("Job %d", i),
})
}
}()
// Collect results
for i := 0; i < 20; i++ {
result := <-pool.Results()
if result.Error != nil {
log.Printf(
"Job %d failed: %v",
result.JobID,
result.Error,
)
continue
}
log.Printf(
"Job %d completed: %s",
result.JobID,
result.Output,
)
}
}
Best Practices 📝
- Use channels for communication
- Use mutexes for state
- Avoid goroutine leaks
- Handle timeouts properly
- Use context for cancellation
- Implement proper error handling
- Use worker pools
- Implement rate limiting
- Use proper synchronization
- Follow Go concurrency patterns
Additional Resources
- Effective Go - Concurrency
- Go Concurrency Patterns: Pipelines and cancellation
- sync Package Documentation
Go's concurrency features provide powerful tools for building efficient, scalable applications. This guide covers essential patterns and best practices for concurrent programming in Go.