Golang의 동시성 패턴: Fan-out / Fan-in
발행: (2026년 2월 23일 오전 05:40 GMT+9)
5 분 소요
원문: Dev.to
Source: Dev.to
위에 제공된 Source 링크에 있는 글의 전체 내용을 알려주시면, 해당 텍스트를 한국어로 번역해 드리겠습니다.
(코드 블록, URL, 마크다운 형식 등은 그대로 유지됩니다.)
이 패턴이 해결할 수 있는 문제들
- 8개의 코어가 있지만 데이터‑처리 단계가 단일 goroutine에서 순차적으로 실행됩니다. 작업 큐가 늘어나는 동안 CPU는 유휴 상태입니다. 공유 채널에서 작업을 병렬로 가져오는 N개의 워커(코어당 하나)를 생성하면 모든 코어가 활용됩니다.
- 파이프라인 단계가 외부 API를 호출하거나 이미지를 리사이즈하는데, 이는 느립니다. 이를 단일 스레드에서 수행하면 전체 파이프라인이 이 단계에서 병목이 됩니다.
- 웹 스크래핑을 위해 100개의 goroutine을 시작했습니다. 전역 변수와 뮤텍스를 사용하지 않고 최종 데이터베이스 쓰기를 위해 모든 결과를 한 곳에 모아야 합니다.
- 들어오는 각 요청마다 새로운 goroutine을 생성하면 피크 부하 시 서비스가 충돌할 수 있습니다(메모리 부족으로 패닉).
핵심
두 단계가 협력하여 작업을 병렬화하는 패턴입니다.
- Fan‑out – 여러 개의 goroutine(워커)을 시작하여 단일 입력 채널에서 작업을 읽고 부하를 분산합니다.
- Fan‑in – 여러 goroutine의 결과를 하나의 출력 채널로 병합하여 최종 처리용 데이터를 통합합니다.
Idea: 고정된 워커 풀에 작업을 분배해 유사한 작업들의 실행을 병렬화하고, 이후 출력 채널을 다중화하여 결과를 집계합니다.
파이프라인과의 차이
- Fan‑out/Fan‑in: 단일 단계의 수평 확장. 하나의 단계가 여러 실행자에 걸쳐 복제됩니다.
- Pipeline: 프로세스를 개별 단계(읽기 → 처리 → 쓰기)로 수직 분해합니다. Fan‑out/Fan‑in은 종종 파이프라인의 특정 단계 내부에 존재합니다.
워커 풀과의 차이점
실제로는 동일한 개념입니다.
- Fan‑out = 풀에 작업을 디스패치하는 것.
- Fan‑in = 풀에서 결과를 수집하는 것.
Pub/Sub과의 차이점 (Publish‑Subscribe)
- Fan‑out/Fan‑in: 입력 채널의 각 작업은 하나의 워커에게만 전달됩니다 (작업 분배, 작업 경쟁).
- Pub/Sub: 각 메시지는 모든 구독자에게 전달됩니다 (브로드캐스트 분배).
package main
import (
"context"
"fmt"
"sync"
)
// Task generation
func generateJobs(n int) <-chan int {
ch := make(chan int)
go func() {
for i := 1; i <= n; i++ {
ch <- i
}
close(ch)
}()
return ch
}
// Fan‑out: Distributing tasks among workers
func fanOut(ctx context.Context, jobs <-chan int, numWorkers int) []<-chan int {
workerChannels := make([]<-chan int, 0, numWorkers)
for i := 0; i < numWorkers; i++ {
resultCh := make(chan int)
go func() {
defer close(resultCh)
for {
select {
case job, ok := <-jobs:
if !ok {
return // Task channel closed
}
// Task processing (example: squaring)
resultCh <- job * job
case <-ctx.Done():
return // Cancellation via context
}
}
}()
workerChannels = append(workerChannels, resultCh)
}
return workerChannels
}
// Fan‑in: Merging results
func fanIn(channels []<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
wg.Add(len(channels))
for _, ch := range channels {
go func(c <-chan int) {
defer wg.Done()
for res := range c {
merged <- res
}
}(ch)
}
// Close the final channel when all workers are done
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
// 1. Generate tasks
jobs := generateJobs(5)
// 2. Fan‑out: distribute tasks among 3 workers
resultChannels := fanOut(context.Background(), jobs, 3)
// 3. Fan‑in: merge results from all channels
mergedResults := fanIn(resultChannels)
// 4. Results
for res := range mergedResults {
fmt.Printf("Получен результат: %d\n", res)
}
}