Patterns

Efficient Concurrent Go #

Design Pattern Questions for Tech Interviews

Questions? #

  1. What concurrency design patterns are you familiar with?
  2. Generator
  3. Timeout, Quit signal & Context
  4. Worker Pool
  5. Fan-Out/Fan-In
  6. What are the key differences between “fan-out/fan-in” and “worker pool” patterns?
  7. Pipeline

Answers: #

1. What concurrency design patterns are you familiar with? #

  • Generator: Functions that return channels
  • Timeout: Adding time limits to goroutine execution
  • Quit Signal: Gracefully stopping goroutines
  • Context: Managing cancellation and deadlines across goroutines
  • Worker Pool: Managing task execution across multiple goroutines
  • Fan-Out/Fan-In: Distributing tasks and collecting results
  • Producer-Consumer: Decoupling data production from consumption via buffer
  • Pipeline: Processing data in stages
  • Multiplexing: Combining multiple channels
  • Bounded Parallelism: Limiting concurrent execution
  • Semaphore: Controlling access to shared resources

2. Generator #

The Generator pattern in Go uses goroutines and channels to produce data streams on demand, enabling lazy, concurrent value generation. It’s ideal for efficiently iterating over large or infinite sequences without blocking or excessive memory use.

Problems Solved by the Pattern #

  • Efficient memory usage for large/infinite datasets through lazy evaluation
  • Encapsulation of complex data generation logic
  • Concurrent data production without blocking consumers
  • Simplified iteration over asynchronous data streams

Key Advantages #

  • Lazy Evaluation: Values generate only when requested
  • Concurrency Safety: Built-in channel synchronization
  • Composability: Easy integration with pipelines and other patterns
  • Resource Control: Clean termination via channel closure

Example #

func oddsGenerator(max int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 1; i <= max; i += 2 {
			out <- i
		}
	}()

	return out
}

func main() {
	for num := range oddsGenerator(10) {
		fmt.Println(num)
	}
}

Code Flow Explanation #

  1. Channel Creation: make(chan int) creates unbuffered channel
  2. Goroutine Launch: Anonymous function starts concurrent execution
  3. Channel Closure: defer close(out) signals completion from the sender fn
  4. Value Production: Loop sends odd numbers via out <- i
  5. Consumption: Main function receives values using range

Example Use Cases #

  • Streaming I/O operations (file processing)
  • Simulating real-time data feeds (sensor data)
  • Implementing concurrent iterators
  • Test data generation

3. Timeout, Quit signal & Context #

Timeout, quit signal, and context patterns in Go coordinate cancellation, timeouts, and graceful shutdowns for concurrent operations. The select statement enables responsive handling of these events by waiting on multiple channels simultaneously.

Useful for: #

  • Preventing indefinite blocking by enforcing timeouts
  • Enables graceful shutdown on user or system quit signals
  • Propagates cancellation across goroutines for resource cleanup
  • Handles multiple cancellation sources (timeout, quit, manual) efficiently

Key Advantages #

  • Centralized, responsive control over goroutine lifecycles
  • Clean resource management and predictable shutdown
  • Simple, readable mechanism for handling multiple asynchronous events

Example without context #


func main() {
	// Create context with program timeout
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	// Capture OS signals for graceful shutdown
	quit := make(chan os.Signal, 1)
	defer close(quit)
	signal.Notify(quit, os.Interrupt, syscall.SIGTERM)

	// Long running task
	msgC := oddsGenerator(ctx, 6)

	// Block the main goroutine
	for {
		select {
		case message, ok := <-msgC:
			if !ok { // channel is closed
				fmt.Println("Jobs done!")
				return
			}
			fmt.Println(message)
		case <-quit:
			fmt.Printf("\nQuit by user")
			return
		// We are timing each message
		// If we have not received msg within specific time
		case <-time.After(500 * time.Millisecond):
			fmt.Println("Can't wait that long!")
			return
		}
	}
}

func oddsGenerator(ctx context.Context, max int) <-chan string {
	msgC := make(chan string)
	go func() {
		defer close(msgC)

		for i := 1; i <= max; i += 2 {
			select {
			case <-ctx.Done():
				fmt.Println("Execution timeout...")
				return
			default:
				msgC <- fmt.Sprintf("Result: %v", i)
				time.Sleep(499 * time.Millisecond)
			}
		}
	}()

	return msgC
}

Practical Use Cases #

  • Graceful shutdown of servers and background workers
  • Enforcing timeouts on I/O or network operations
  • Canceling database queries or HTTP requests if the client disconnects
  • Handling user interrupts in CLI tools
  • Coordinating cancellation across multiple goroutines

4. Worker Pool #

The worker pool pattern is a concurrency design that manages a fixed number of worker goroutines to process tasks from a shared queue. It efficiently handles large numbers of independent tasks while controlling resource usage. Workers continuously pull tasks, process them concurrently, and send results to an output queue. This pattern prevents system overload, improves performance through parallel processing, and maintains predictable resource utilization, making it ideal for scenarios like batch operations or API request handling.

Problems Solved by Worker Pool Pattern #

  1. Resource Management

    • Limits concurrent operations to prevent system overload
    • Controls memory usage by capping goroutine count
    • Avoids thread/goroutine spawning overhead
  2. Efficient Concurrency

    • Processes N jobs in ~(N/Workers) seconds vs N sec sequentially
    • Reuses existing workers instead of creating per-task goroutines
    • Balances workload across available CPU cores
  3. Task Coordination

    • Ensures orderly processing of tasks
    • Provides clean shutdown mechanism
    • Enables result collection/aggregation

Key Advantages #

  • Predictable resource usage
  • Better error handling
  • Improved performance scaling
  • Easier monitoring/debugging
  • Graceful shutdown capabilities

Example: #

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs { // Automatically exits when jobs channel closes
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(time.Second) // Simulate work
        results <- job * 2      // Send result
    }
}

func main() {
    const numJobs = 5
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    wg := sync.WaitGroup{}

    // Start worker pool
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // Send jobs to workers
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // Signal no more jobs will be sent

    // Wait for all workers to finish processing
    wg.Wait()
    close(results) // Safe to close results after all workers exit

    // Collect and print results
    fmt.Println("\nResults:")
    for result := range results {
        fmt.Printf("Result: %d\n", result)
    }
}

Flow #

  1. Workers start and block waiting for jobs.
    • 3 workers process 5 jobs concurrently
  2. Each worker:
    • Receives jobs from jobs channel
    • Processes job (simulated with 1s sleep)
  3. Workers process jobs concurrently.
    • Send results to results channel
    • Exit when jobs channel closes
  4. After all jobs complete:
    • Workers exit via closed jobs channel
    • Results channel closes
  5. Main collects and prints results.
    • sync.WaitGroup ensures main waits for worker completion
    • Closing channels signals completion:
      • close(jobs) triggers worker exit
      • close(results) enables safe result collection

Example Use Cases: #

  • Batch processing large datasets
  • Handling API rate limits
  • Image/video processing pipelines
  • Database operation queues
  • Concurrent network requests

5. Fan-Out/Fan-In #

The fan-out/fan-in pattern is a concurrency design used to parallelize and coordinate tasks. In the fan-out stage, a single task is divided into smaller subtasks executed concurrently by multiple goroutines. The fan-in stage collects and combines results from all subtasks. This pattern improves performance by distributing workload across goroutines, enabling parallel processing. It’s implemented using goroutines and channels in Go, making it efficient for handling large-scale, divisible tasks.

Problems Solved by the Pattern #

  1. High-volume processing

    • Distributes workloads across multiple workers
    • Handle large datasets or tasks efficiently
    • Processes independent tasks concurrently to minimize total execution time
  2. Resource optimization

    • Limits concurrent operations to prevent system overload
    • Maximizing CPU utilization
  3. Result aggregation

    • Simplifies collecting outputs from parallel operations into a unified stream

Key Advantages #

  • Scalability: Easily adjust worker count to match workload demands
  • Decoupled components: Workers operate independently, improving fault isolation
  • Order-agnostic processing: Ideal for tasks where result order doesn’t matter
  • Cost efficiency: Reduces cloud costs via optimized resource usage (e.g., AWS Lambda parallel invocations)

Example: #

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d processing job %d\n", id, job)
		time.Sleep(500 * time.Millisecond) // Simulate work
		results <- job * 2
	}
}

func main() {
	const (
		numJobs    = 10
		numWorkers = 3
	)

	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	var wg sync.WaitGroup

	// Fan-Out: Start worker pool
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, jobs, results, &wg)
	}

	// Feed jobs to workers
	go func() {
		for j := 1; j <= numJobs; j++ {
			jobs <- j
		}
		close(jobs)
	}()

	// Fan-In: Collect results
	go func() {
		wg.Wait()
		close(results)
	}()

	// Process aggregated results
	for result := range results {
		fmt.Printf("Result: %d\n", result)
	}
}

Code Flow Explanation #

  1. Initialization

    • Create buffered channels for jobs and results
    • Initialize WaitGroup for worker synchronization
  2. Fan-Out Phase

    • Launch worker goroutines that pull from jobs channel
    • Each worker processes jobs concurrently
  3. Job Distribution

    • Feed jobs to workers via channel
    • Close channel when done to trigger worker exit
  4. Fan-In Phase

    • Close results channel after all workers complete
    • Enables clean exit from results loop
  5. Result Aggregation

    • Main thread processes combined outputs

Example Use Cases: #

  • Real-time data processing (IoT sensor streams)
  • Bulk image/video transcoding
  • Distributed web scraping
  • Concurrent API request handling
  • Log aggregation from multiple sources
  • ETL (Extract-Transform-Load) pipelines

6. What are the key differences between “fan-out/fan-in” and “worker pool” patterns? #

The key differences between the “fan-out/fan-in” pattern and the “worker pool” pattern are:

  1. Task Distribution:

    • Fan-out/fan-in: Dynamically creates goroutines for each task, potentially leading to a large number of concurrent goroutines.
    • Worker pool: Uses a fixed number of worker goroutines that process tasks from a shared queue.
  2. Concurrency Control:

    • Fan-out/fan-in: Offers less control over maximum concurrency, as it can spawn many goroutines.
    • Worker pool: Provides better control over resource usage by limiting the number of concurrent workers.
  3. Flexibility:

    • Fan-out/fan-in: More flexible for handling varying workloads and task types.
    • Worker pool: Better suited for consistent workloads and similar task types.
  4. Resource Management:

    • Fan-out/fan-in: May require additional mechanisms like semaphores or rate limiters for resource control.
    • Worker pool: Inherently manages resources by limiting the number of concurrent workers.
  5. Implementation Complexity:

    • Fan-out/fan-in: Can be simpler to implement for small-scale tasks.
    • Worker pool: May require more setup but offers better long-term scalability.

Both patterns can be used for concurrent processing, and the choice depends on specific application requirements and resource constraints.


7. Producer-Consumer #

The producer-consumer pattern is a concurrency design pattern where one or more producer threads generate data or tasks, and one or more consumer threads process or execute them. This pattern uses a shared queue as an intermediary, allowing producers and consumers to work independently and at different rates. It helps decouple data production from consumption, enables efficient workload distribution, and facilitates resource management in concurrent systems.

Problems Solved #

  • Concurrent Access: Prevents race conditions when multiple producers/consumers access shared resources
  • Rate Mismatch: Buffers data when production and consumption speeds differ
  • Resource Management: Avoids overwhelming systems by limiting concurrent processing (backpressure)
  • Decoupling: Separates data generation logic from processing logic

Key Advantages #

  • Modularity: Producers and consumers operate independently
  • Scalability: Easily add more producers/consumers without redesign
  • Efficiency: Enables parallel processing and load balancing
  • Backpressure Handling: Prevents system overload via bounded buffers

Common Use Cases #

  • Real-Time Data: Stock tickers, sensor data processing
  • Task Queues: Web servers handling HTTP requests
  • Logging Systems: Aggregating logs from multiple sources
  • Distributed Systems: Asynchronous communication between microservices

8. Pipeline #

The pipeline pattern is a concurrency design pattern used to process data sequentially through multiple stages, where each stage performs a specific operation and passes the result to the next stage via channels. It enables efficient and modular data processing.

Problems Solved by the Pattern #

  • Sequential Data Processing: Handles multi-step workflows where data needs to be transformed or processed in stages
  • Concurrency: Allows multiple stages to run concurrently, improving performance
  • Decoupling: Separates logic for each stage, making the code more modular and easier to maintain
  • Scalability: Efficiently processes large datasets by leveraging parallelism

Key Advantages #

  • Modularity: Each stage is independent, making it easy to add, remove, or modify stages without affecting the rest of the pipeline
  • Concurrent Execution: Multiple stages can operate simultaneously, reducing overall processing time
  • Improved Throughput: Enables efficient use of CPU and I/O resources by processing data in parallel
  • Error Handling: Errors can be isolated and handled at specific stages without affecting others

Common Use Cases #

  • ETL (Extract, Transform, Load): Processing and transforming large datasets
  • Image or Video Processing Pipelines: Sequential operations like resizing, filtering, and saving images
  • Text Analysis: Tokenization, filtering, and sentiment analysis of text data
  • Financial Data Analysis: Sequential calculations on large streams of financial data
  • Log Processing: Filtering, transforming, and aggregating logs in real-time

Example: #

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func produce(num int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 0; i < num; i++ {
			out <- rand.Intn(100) // Generate random numbers
		}
	}()
	return out
}

func double(input <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for value := range input {
			out <- value * 2 // Double the value
		}
	}()
	return out
}

func filterGt10(input <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for value := range input {
			if value > 10 { // Filter values greater than 10
				out <- value
			}
		}
	}()
	return out
}

func main() {
	rand.Seed(time.Now().UnixNano())    // Seed random number generator
	numbersCh := produce(10)            // Stage 1: Generate random numbers
	doubledCh := double(numbersCh)      // Stage 2: Double the numbers
	filteredCh := filterGt10(doubledCh) // Stage 3: Filter numbers greater than 10
	for value := range filteredCh {     // Stage 4: Print the final output
		fmt.Printf("Value is %d\n", value)
	}
}

Code Flow Explanation #

  1. Stage Initialization:
    • Each stage is implemented as a function that takes an input channel and returns an output channel
    • Goroutines are used to execute each stage concurrently
  2. Data Flow:
    • Data flows through the pipeline via channels
    • Each stage processes its input and sends results to the next stage
  3. Termination:
    • Channels are closed when a stage finishes processing all input data (defer close(out))
    • Closing channels signals downstream stages to stop reading
  4. Final Output:
    • The final stage 4 consumes the filtered values from stage 3 and displays them

Best Practices When Coding #

  • Always close output channels when a stage finishes processing (defer close(out))
  • If stages have varying speeds, use buffered channels to prevent blocking fast producers or slow consumers
  • Use a custom struct (e.g., Result { Value int, Err error }) to propagate errors through the pipeline without panicking
  • Use context.Context or a done channel to signal goroutines to exit early during errors or shutdowns.
  • Identify slow stages and parallelize them using Fan-Out/Fan-In patterns if needed
  • Each stage should be designed as an isolated unit for easier testing and debugging