증분 Zoho Desk에서 BigQuery 파이프라인 구축: 현장의 교훈
Source: Dev.to
우리 회사가 고객 지원 분석을 중앙 집중화하기로 결정했을 때, 그 작업이 내게 맡겨졌다: Zoho Desk에서 데이터를 가져와 BigQuery에 적재하고, dbt로 변환하면 끝. 계획은 종이 위에서는 깔끔해 보였다.
그 뒤에 이어진 것은 프로덕션 데이터 엔지니어링이 행복한 경로만큼 단순하지 않다는 것을 보여주는 마스터 클래스였다.
이것은 그 파이프라인을 구축한 이야기다—아키텍처 결정, 내가 마주친 장벽들, 그리고 앞으로 내가 만드는 모든 파이프라인에 적용할 교훈들.
시작점: 영원히 걸린 전체 로드
첫 번째 작동 버전의 파이프라인은 투박했지만 기능적이었습니다. 매일 다음을 수행했습니다:
- Zoho Desk API에서 생성된 모든 티켓을 가져왔습니다.
- BigQuery 테이블을 덮어썼습니다.
단순했습니다. 예측 가능했습니다. 그리고 티켓의 경우, 테이블에 백만 개가 넘는 행이 있었고 매일 증가하고 있었기 때문에 규모에 따라 완전히 감당할 수 없었습니다.
증분 로드가 필요했습니다. 하지만 그에 도달하기 전에 해결해야 할 다른 문제가 있었습니다: API 작업을 며칠 동안 실행하지 않고 처음부터 모든 과거 데이터를 BigQuery에 어떻게 가져올 수 있을까?
부트스트래핑 문제: 초기 로드 시 API가 너무 느릴 때
수백만 건의 과거 데이터를 페이지네이션된 API를 통해 로드하는 것은 파이프라인 문제라기보다 대기 문제입니다. 수십만 행을 가진 테이블이라면, 최적화된 API 호출이라 할지라도 시드 로드만 해도 몇 시간에서 며칠이 걸릴 수 있습니다.
해결책: 초기 로드에서는 API를 완전히 우회합니다. Zoho Desk에는 전체 계정 데이터를 CSV 파일로 내보내는 데이터 백업 기능이 내장되어 있습니다. 이를 이용해 티켓, 스레드, 연락처, 통화 등의 전체 스냅샷을 내보낸 뒤, 각 CSV 파일을 BigQuery 콘솔 UI를 통해 직접 로드했습니다.
UI 로드 프로세스
| 설정 | 값 |
|---|---|
| 포맷 | CSV |
| 스키마 | 수동으로 정의 (자동 감지 아님 — 왜 중요한지 나중에 설명) |
| 선행 행 건너뛰기 | 1 (헤더 행) |
| 인용된 줄바꿈 허용 | 예 (줄바꿈이 포함된 티켓 설명 같은 필드에 필수) |
| 불균형 행 허용 | 예 (API 응답이 선택적 필드를 생략할 때가 있음) |
과거 스냅샷을 BigQuery에 올린 뒤, 증분 파이프라인의 start_date를 백업 날짜로 설정했습니다. 첫 번째 예약 실행이 그 날 이후의 변경 사항을 모두 잡아오므로, 데이터 누락도 겹침도 없습니다.
교훈: 대규모 초기 로드에서는 API와 싸우지 마세요. 내보내기 기능이 있다면 이를 활용하세요. 파이프라인은 데이터를 최신 상태로 유지하기 위한 것이고, 과거 데이터를 가져오는 것은 일회성 부트스트래핑 문제이므로 별도의 해결책이 필요합니다.
The Architecture: Code Generation Over Copy‑Paste
별도의 Airflow DAG를 각 Zoho Desk 엔드포인트마다 작성하는 대신, 코드‑생성 시스템을 구축했습니다:
- Custom Airflow operator (
ZohoDeskToGCSOperator) – API 추출 로직 전체를 처리합니다 (페이지네이션, OAuth, 동시 상세 조회, 증분 검색). - Jinja template – DAG 구조를 한 번만 정의합니다.
- YAML config files – 엔드포인트당 하나씩; 각각 일정, 컬럼, 스키마, 엔드포인트 유형을 정의합니다.
- Generator script – YAML + 템플릿을 렌더링하여 DAG 파이썬 파일을 생성합니다.
flowchart TD
A[Zoho Desk API] --> B[ZohoDeskToGCSOperator]
B --> C[GCS (staging CSV)]
C --> D[GCSToBigQueryOperator]
D --> E[BigQuery (_staging table)]
E --> F[BigQueryInsertJobOperator (MERGE into main table)]
- 대용량 트랜잭션 테이블(티켓, 연락처, 스레드, 통화)의 경우 데이터가 먼저
_staging테이블에 적재된 뒤, 메인 테이블에 MERGE 되어 기존 행을 업데이트하고 새로운 행을 삽입합니다. - 소규모 레퍼런스 테이블(에이전트, 팀, 부서)의 경우 매일
WRITE_TRUNCATE만 하면 충분합니다.
Challenge 1: Not All APIs Are Created Equal
- ticket 및 contact 엔드포인트는 Zoho의
modifiedTimeRange파라미터를 지원하여 시작·종료 타임스탬프를 지정함으로써 증분 로드를 할 수 있습니다. - /calls 엔드포인트는 지원하지 않습니다.
modifiedTimeRange를 전달하면 422 오류가 발생합니다.
우회 방법: createdTime을 내림차순으로 정렬하고 현재 페이지의 가장 오래된 레코드가 윈도우보다 이전이면 페이지네이션을 중단합니다. 실제로 통화 데이터는 추가만 되므로 이는 동일한 효과를 가집니다.
for rec in records:
if rec["createdTime"] < data_interval_start:
done = True
break
교훈: 같은 벤더의 엔드포인트라 하더라도 API 기능이 동일하다고 가정하지 마세요. 파이프라인 코드를 한 줄이라도 작성하기 전에 모든 엔드포인트를 개별적으로 테스트해야 합니다.
Challenge 2: A Reserved Word Hiding in Your Column Names
threads 테이블에는 to 라는 컬럼이 있습니다(스레드의 수신자). 안타깝게도 TO는 BigQuery SQL에서 예약어입니다.
생성된 MERGE 문은 원래 다음과 같이 나왔습니다:
INSERT (`from`, `to`, `subject`, ...)
VALUES (S.`from`, S.`to`, S.`subject`, ...)
to가 따옴표 없이 사용돼서 BigQuery 파서가 다음과 같은 오류를 발생시켰습니다:
Syntax error: Unexpected keyword TO at [40:130]
수정: 컬럼 이름을 백틱(“)으로 감싸고, 실제로는 모든 식별자를 백틱으로 감싸도록 MERGE SQL을 생성합니다.
교훈: 프로그램matically SQL을 생성할 때는 항상 모든 식별자를 인용하세요. 어떤 컬럼 이름이 예약어와 충돌할지 미리 알 수 없습니다.
TL;DR 요약
| # | 핵심 요점 |
|---|---|
| 1 | 초기 대량 로드를 위해 기본 데이터 내보내기 기능을 사용하고, 이를 별도의 부트스트래핑 단계로 간주하십시오. |
| 2 | 복사‑붙여넣기를 방지하고 아키텍처를 DRY하게 유지하기 위해 코드 생성 파이프라인(YAML + Jinja + 커스텀 연산자)을 구축하십시오. |
| 3 | 각 API 엔드포인트를 개별적으로 테스트하십시오—modifiedTimeRange와 같은 기능은 전체적으로 보장되지 않을 수 있습니다. |
| 4 | 숨겨진 예약어 충돌을 방지하기 위해 생성된 SQL에서 식별자를 항상 따옴표로 감싸십시오. |
| 5 | 대형 테이블의 경우 임시 테이블에 적재한 후 MERGE를 사용하고, 작은 참조 테이블은 WRITE_TRUNCATE를 사용해도 괜찮습니다. |
Challenge 3: The MERGE That Couldn’t Match Rows
구문 오류를 수정한 후, 스레드 MERGE는 다른 문제에 부딪혔습니다:
UPDATE/MERGE must match at most one source row for each target row
스레드 엔드포인트는 다음과 같이 작동합니다:
- 시간 창에서 수정된 티켓을 검색합니다.
- 해당 티켓 각각에 대해 모든 스레드를 가져옵니다.
Zoho의 modifiedTimeRange 검색은 페이지네이션이 적용되며, 결과 집합이 요청 사이에 이동하면 동일한 티켓이 여러 페이지에 나타날 수 있습니다. 이 경우 동일한 티켓에 대해 스레드를 두 번 가져오게 되어 스테이징 테이블에 중복된 스레드 ID가 생깁니다. BigQuery의 MERGE는 여러 소스 행이 대상 행과 일치할 때 업데이트를 거부합니다.
Fixes
연산자(Python)에서 – 스레드를 가져오기 전에 티켓 ID를 중복 제거합니다:
ticket_ids = list(dict.fromkeys(ticket_ids))
dict.fromkeys는 중복을 제거하면서 삽입 순서를 유지하므로, 집합으로 변환했다가 다시 리스트로 바꾸는 것보다 깔끔합니다.
MERGE SQL(템플릿)에서 – USING 절에 중복 제거 방어 코드를 추가합니다:
USING (
SELECT *
FROM `staging_table`
QUALIFY ROW_NUMBER() OVER (PARTITION BY `id`) = 1
) S
Python 수정은 문제 발생 자체를 방지하고, SQL 방어 코드는 예상치 못한 경우에 대비한 안전망 역할을 합니다.
교훈: MERGE 파이프라인에서는 항상 스테이징 SELECT에 QUALIFY ROW_NUMBER() 중복 방어 구문을 추가하세요. 소스가 깨끗해 보여도 예기치 않은 중복으로부터 보호할 수 있습니다.
챌린지 4: 자동 감지가 거짓을 말할 수도 있음
초기 백업 CSV를 BigQuery에 로드할 때, 스키마를 명시적으로 정의하지 않고 자동 감지에 맡겼습니다. 빠르고 편리했지만, 실패를 초래했습니다.
자동 감지는 증분 파이프라인이 기대하는 것과 일치하지 않는 선택을 했습니다:
| 컬럼 | 예상 타입 | 자동 감지 결과 |
|---|---|---|
onholdTime | TIMESTAMP | STRING |
tagCount | STRING | INT64 (정확) |
isEscalated | BOOLEAN | STRING (값 "true"/"false") |
첫 번째 MERGE는 TIMESTAMP 값을 STRING 컬럼에 할당하려고 했고, MERGE 시 BigQuery의 타입 강제 적용이 엄격했습니다.
해결 방법
메인 테이블에 대해 INFORMATION_SCHEMA.COLUMNS를 쿼리하고 YAML 스키마와 각 컬럼 타입을 비교합니다:
SELECT column_name, data_type
FROM `project.dataset.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'zoho_desk_tickets'
ORDER BY ordinal_position;
교훈
초기 로드가 진실의 원천이며, 타입이 어떠해야 한다는 가정이 아닙니다. 수동으로 부트스트랩된 테이블에 대한 스키마 구성을 작성하기 전에 항상 INFORMATION_SCHEMA를 확인하세요.
dbt 레이어: 원시 레이어가 해결하지 못하는 문제 해결
원시 파이프라인이 안정화되면, dbt가 변환을 담당하여 깔끔하고 분석가가 바로 사용할 수 있는 테이블을 만든다. 이렇게 하면 원시 레이어는 소스의 충실한 복사본으로 유지되고, 변환 레이어가 타입 정규화를 처리한다.
내가 다르게 할 것
- 초기 로드에 네이티브 내보내기 사용 – 과거 데이터를 위해 API와 싸우지 마세요. 전체 백업을 내보내고 UI를 통해 로드한 뒤, 파이프라인이 그 시점부터 증분을 처리하도록 합니다.
- 증분 파이프라인이 MERGE할 테이블에 auto‑detect를 절대 사용하지 마세요 – 스키마를 명시적으로 정의하고, 로드 직후
INFORMATION_SCHEMA로 확인합니다. - 처음부터
QUALIFY ROW_NUMBER()중복 방지 구문을 추가하세요 – 비용이 들지 않으며 나중에 발생할 수 있는 신비한 MERGE 실패를 방지합니다. - 각 API 엔드포인트의 쿼리 파라미터 지원을 독립적으로 테스트하세요 – 한 엔드포인트의 가정을 다른 엔드포인트에 적용하지 마세요.
- 생성된 SQL에서 모든 식별자를 백틱(`)으로 감싸세요 – 예약어 충돌은 예측할 수 없으며 해결 방법은 간단합니다.
- 원시 레이어와 변환 레이어를 분리하세요 – 최소 변환으로 원시 데이터를 BigQuery에 적재하고, 별도의 dbt 레이어를 사용해 타입 지정 및 이름 변경을 하면 디버깅이 훨씬 쉬워집니다. API를 다시 호출하지 않고도 언제든 dbt를 재실행할 수 있습니다.
마무리
여기서 설명한 도전 과제들은 특이한 것이 없었습니다. 예약어, 중복 행, 타입 불일치, API 불일치 등은 데이터 엔지니어링에서 일상적인 문제입니다. 이 문제들이 어려워 보였던 이유는 분석가들이 기다리고 있던 파이프라인에서 압박을 받으며 하나씩 프로덕션 환경에서 마주했기 때문입니다.
이제 파이프라인은 프로덕션에서 안정적으로 실행되고 있습니다: 티켓, 연락처, 스레드, 에이전트, 팀, 부서, 그리고 계정이 매일 증분 로드됩니다.
Zoho, Salesforce, HubSpot 또는 기타 SaaS API와 같이 비슷한 것을 구축하고 있다면, 이 교훈들이 여러분의 머리 고민 시간을 몇 시간이라도 절약해 주길 바랍니다.
이 파이프라인은 Apache Airflow, Google Cloud Storage, BigQuery, 그리고 dbt 로 구축되었습니다. 여기서 설명한 커스텀 오퍼레이터 패턴과 코드 생성 접근 방식은 모든 REST API 통합에 적용할 수 있습니다.