학습·실험 – Go 동시성 – 2부

발행: (2026년 6월 7일 AM 10:51 GMT+9)
9 분 소요
원문: Dev.to

출처: Dev.to

리프레셔 - 저는 Go로 분산 청크 파일 스토어를 만들고 있었고, 파트 1에 대한 글을 여기 에 올렸습니다. 파트 1에서는 파일 업로드를 다뤘고, 이번 글은 다운로드에 관한 내용입니다.

Setup

Requirements

  • 사용자가 파일명/파일 ID와 함께 우리 엔드포인트를 호출한다
  • 이 파일 ID를 이용해 청크 목록을 얻는다
  • 우리의 조회 메커니즘은 이 청크 목록에만 의존한다
  • 청크들을 병렬로 가져올 수 있어야 한다
  • 정확성이 보장되어야 한다(당연하지만, 왜 명시했는지 설명하겠다)

The Test Suite

먼저, 간단한 테스트 스위트를 살펴보자:

type fakeWriter struct {
    hdr http.Header
}

func (w *fakeWriter) Header() http.Header {
    if w.hdr == nil {
        w.hdr = make(http.Header)
    }
    return w.hdr
}

func (w *fakeWriter) WriteHeader(int) {}
func (w *fakeWriter) Write(p []byte) (int, error) {
    return len(p), nil
}

func BenchmarkRetrieveChunk(b *testing.B) {
    dir := b.TempDir()
    const numChunks = 12
    const chunkBytes = 64 << 10

    chunks := make([]Chunk, numChunks)
    for i:= range(numChunks) {
        body := bytes.Repeat([]byte{byte('A'+i)}, chunkBytes)
        path := filepath.Join(dir, fmt.Sprintf("%v.%v", "chunk", i))
        if err := os.WriteFile(path, body, 0o600); err != nil {
            b.Fatal(fmt.Errorf("Error while setting up benchmark - %v", err))
        }
        chunks[i] = Chunk{ChunkIndex: i, ChunkSize: chunkBytes, StoreLoc: path}
    }
    s := &Server{
    config: Config{
        Store: b.TempDir(), 
        Chunksize: 10,
    }, 
    concurrencyConfig: ConcurrencyConfig{
        MaxUploadConcurrency: 5, 
        MaxDownloadConcurrencyPerFile: 1,
    }}

    b.SetBytes(int64(numChunks)*chunkBytes)
    b.ResetTimer()
    for b.Loop() {
        s.retrieveChunks(&fakeWriter{}, "chunk", &chunks)
    }
}

우리 저장 메커니즘보다 조금 더 복잡하다.

fakeWriter를 정의했을까? 파일은 보통 크기 때문에 청크를 읽는 즉시 스트리밍해야 한다. 이런 경우 청크를 읽자마자 바로 상위 HTTP 라이터에 쓰면서, 다른 청크들은 백그라운드에서 병렬로 다운로드되길 원한다.

The Concurrent Chunk retrieval

동시 파일 업로드에서는 전역 채널을 재사용할 수 있었다. 청크 정보가 공급되면, 고루틴이 어떤 순서로 끝나든 상관없고 결국 저장만 하면 되기 때문이다. 하지만 동시 파일 다운로드에서는 청크를 순서대로 가져와야 한다. 예를 들어 파일이 5개의 청크로 나뉘어 있다면, 1~5 순서대로 스트리밍해야 한다. 청크 5가 청크 4보다 먼저 끝난다면, 청크 4가 끝날 때까지 기다렸다가 스트리밍해야 한다.

즉, 청크를 순서대로 가져와야 한다는 뜻이다. 전역 채널을 재사용할 수 없다는 의미일까? 가능하지만 현재 탐구하고 싶은 범위보다 복잡해진다. 전역 채널을 사용하면 여러 파일 다운로드 요청 간에 순서가 겹치지 않도록 별도의 격리가 필요하다. 나중에 방법을 찾아볼 것이지만, 지금은 넘어가자.

Implementing ordering

어떻게 순서를 구현할까? 청크는 bufpool에서 가져온다. 먼저 순차적으로 스폰하는 방법을 생각해볼 수 있다:

queue := make(chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
            return s.readChunk(ctx, queue, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%v", filename))
for res := range(queue) {
    if _, err := w.Write(res); err != nil {
        return
    }
}
if err := g.Wait(); err != nil{
    http.Error(w, "Stream disconnected", http.StatusInternalServerError)
    return
}

이 코드는 청크 i에 대한 고루틴을 i+1보다 먼저 스폰한다. 하지만 이것은 스폰 순서일 뿐, 완료 순서를 보장하지 않는다. 채널은 “다음에 전송된 값을 주세요”라는 의미이기 때문이다. 만약 청크 i+1이 청크 i보다 먼저 끝난다면, 채널을 순회하면서 i+1을 먼저 받아 클라이언트에 쓰게 되고, 이는 정확성을 위배한다.

우리는 청크를 병렬로 가져오되, 순차적으로 쓰고 싶다. 가장 단순한 방법은 청크 개수를 미리 알고 있으므로 var readMask []bool(크기 numChunks)을 두고, i번째 청크가 완료될 때까지 쓰기 루프를 블록시키는 것이다:

queue := make(chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
readMask := make([]bool, numChunks)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
                        defer func() { readMask[i] = true }
            return s.readChunk(ctx, queue, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%v", filename))
for res := range(queue) {
        while !readMask[i] {}
    if _, err := w.Write(res); err != nil {
        return
    }
}
if err := g.Wait(); err != nil{
    http.Error(w, "Stream disconnected", http.StatusInternalServerError)
    return
}

readMask가 i번째 청크가 읽히길 기다리며 무한 루프를 도는 것처럼 보인다. 하지만 이제 새로운 문제가 생긴다: 여러 고루틴이 동시에 readMask를 수정하므로 뮤텍스를 사용해 데이터 레이스를 방지해야 한다.

이 문제를 해결하기 위해 채널 배열을 도입할 수 있다:

queue := make([]chan []byte, numChunks)
g.Go(func() error {
...
    for i := 0; i < numChunks; i++ {
        g.Go(func() error {
            return s.readChunk(ctx, queue[i], (*chunkInfo)[i]) 
...
})
for i := range queue {
        data, ok := <- queue[i]
        // handle data and ok
    if _, err := w.Write(data); err != nil {
        return
    }
}

readChunk에서 queue[i]를 닫기 때문에 <-queue[i]는 닫힐 때까지 블록된다. 여기서 뮤텍스 문제가 사라지는 이유는? 슬라이스 자체를 설정 이후에 변형하지 않기 때문이다. 이후 모든 연산은 읽기 전용이며, queue[i]는 이미 초기화된 채널을 가리키는 포인터일 뿐이다. readChunk는 같은 기본 채널에 쓰기만 할 뿐, queue[i] 객체 자체를 바꾸지는 않는다. 따라서 슬라이스 자체에 뮤텍스가 필요하지 않다.

이렇게 하면 동시성 문제는 해결된다. 하지만 boundedness(제한) 문제는 남는다. 첫 번째 청크가 스트리밍되는 동안 100개의 청크가 모두 메모리에 적재될 수 있다—왜냐하면 고루틴 수를 제한하지 않기 때문이다. 이를 방지하려면 용량이 있는 세마포어를 사용해 고루틴 생성을 제어할 수 있다. 나는 다른 접근법을 선택했다—채널 안에 채널을 넣는 것이다!

queue := make(chan chan []byte, s.concurrencyConfig.MaxDownloadConcurrencyPerFile)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
    defer close(queue)
    for i := 0; i < numChunks; i++ {
        res := make(chan []byte, 1)
        select {
            case queue <- res:
            case <- ctx.Done():
                return ctx.Err()
        }
        g.Go(func() error { 
            defer close(res)
            return s.readChunk(ctx, res, (*chunkInfo)[i]) 
        })
    }
    return nil
})
w.Header().Set("Content-Disposition", fmt.S
0 조회
Back to Blog

관련 글

더 보기 »