Concurrency patterns on Golang: Fan-out / Fan-in
Source: Dev.to
Problems this pattern can solve
- You have 8 cores, but a data‑processing stage runs sequentially in a single goroutine. The CPU sits idle while the task queue grows. By creating N workers (one per core) that pull tasks from a shared channel in parallel, all cores are utilized.
- A pipeline stage calls an external API or resizes images, which is slow. If left in a single thread, the entire pipeline bottlenecks at this stage.
- You have launched 100 goroutines for web scraping. You need to collect all results in one place for final database writing without using global variables and mutexes.
- Creating a new goroutine for each incoming request can crash the service under peak load (panic due to memory exhaustion).
Essence
A pattern consisting of two phases that work in tandem to parallelize tasks.
- Fan‑out – launching multiple goroutines (workers) to read tasks from a single input channel, distributing the load.
- Fan‑in – merging results from multiple goroutines into a single output channel, consolidating data for final processing.
Idea: Parallelize the execution of similar tasks by distributing them among a fixed pool of workers and subsequently aggregating results through multiplexing of output channels.
Difference from Pipeline
- Fan‑out/Fan‑in: Horizontal scaling of a single stage. One step is multiplied across many executors.
- Pipeline: Vertical decomposition of a process into distinct steps (read → process → write). Fan‑out/Fan‑in often lives inside a specific stage of a pipeline.
Difference from Worker Pool
Practically the same concept.
- Fan‑out = task dispatching to the pool.
- Fan‑in = result collection from the pool.
Difference from Pub/Sub (Publish‑Subscribe)
- Fan‑out/Fan‑in: Each task from the input channel is received by only one worker (work distribution, competition for tasks).
- Pub/Sub: Each message is received by all subscribers (broadcast distribution).
Example
package main
import (
"context"
"fmt"
"sync"
)
// Task generation
func generateJobs(n int) <-chan int {
ch := make(chan int)
go func() {
for i := 1; i <= n; i++ {
ch <- i
}
close(ch)
}()
return ch
}
// Fan‑out: Distributing tasks among workers
func fanOut(ctx context.Context, jobs <-chan int, numWorkers int) []<-chan int {
workerChannels := make([]<-chan int, 0, numWorkers)
for i := 0; i < numWorkers; i++ {
resultCh := make(chan int)
go func() {
defer close(resultCh)
for {
select {
case job, ok := <-jobs:
if !ok {
return // Task channel closed
}
// Task processing (example: squaring)
resultCh <- job * job
case <-ctx.Done():
return // Cancellation via context
}
}
}()
workerChannels = append(workerChannels, resultCh)
}
return workerChannels
}
// Fan‑in: Merging results
func fanIn(channels []<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for res := range c {
merged <- res
}
}(ch)
}
// Close the final channel when all workers are done
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// 1. Generate tasks
jobs := generateJobs(5)
// 2. Fan‑out: distribute tasks among 3 workers
resultChannels := fanOut(context.Background(), jobs, 3)
// 3. Fan‑in: merge results from all channels
mergedResults := fanIn(resultChannels)
// 4. Results
for res := range mergedResults {
fmt.Printf("Получен результат: %d\n", res)
}
}