나는 회의 메모를 자동 업데이트되는 Neo4j Knowledge Graph로 변환했다
Source: Dev.to
소개
모든 팀은 회의록을 생성하지만, “예산 회의에 모두 누가 참석했나요?” 혹은 “이번 달에 Alex에게 할당된 작업은 무엇인가요?” 와 같은 질문에 답하기는 정적인 텍스트일 때 어렵습니다.
이 글에서는 Google Drive에 저장된 Markdown 회의록을 자동으로 업데이트되는 Neo4j 지식 그래프로 변환하는 CocoIndex 워크플로우를 보여줍니다.
왜 자체 업데이트 그래프가 필요할까?
- 대규모 조직에서는 수만 개에서 수백만 개에 이르는 회의록이 폴더와 도구에 흩어져 있습니다.
- 회의록은 끊임없이 변합니다—사람이 바뀌고, 작업이 이동하고, 결정이 수정됩니다.
- 기존의 키워드 검색은 회의록을 정적인 것으로 취급해 전체 재계산이 비싸거나 그래프가 오래된 상태가 됩니다.
CocoIndex의 증분 처리 방식은 변경된 문서만 감지하고, 해당 섹션에 대해 LLM 추출을 수행한 뒤 결과를 Neo4j에 upsert하여 규모가 커져도 연산 및 LLM 비용을 낮게 유지합니다.
그래프 모델
| 노드 라벨 | 주요 필드 |
|---|---|
| Meeting | note_file, time |
| Person | name |
| Task | description |
| 관계 타입 | 방향 |
|---|---|
| ATTENDED | Person → Meeting |
| DECIDED | Meeting → Task |
| ASSIGNED_TO | Person → Task |
이 모델은 다음과 같은 쿼리를 지원합니다:
- “Dana는 어떤 회의에 참석했나요?”
- “이 작업은 어디서 결정되었나요?”
- “Q4에 생성된 모든 작업을 현재 누가 소유하고 있나요?”
파이프라인 개요
- Google Drive (변경 추적) – 새 파일 또는 수정된 파일을 감지합니다.
- 변경된 문서 식별 – 서비스 계정 자격 증명과
last‑modified타임스탬프를 사용합니다. - 각 파일을 개별 회의로 분할 – Markdown 헤딩을 기준으로 합니다.
- LLM 추출 (변경된 회의에만) – 미리 정의된 데이터 클래스와 일치하는 구조화된 데이터를 반환합니다.
- 노드와 관계 수집 – 메모리 내 테이블(collector)로 저장합니다.
- Neo4j에 upsert 방식으로 내보내기 – 중복 노드/엣지를 방지합니다.
사전 요구 사항
| 요구 사항 | 세부 내용 |
|---|---|
| Neo4j | 로컬 인스턴스 (http://localhost:7474 UI), 사용자 neo4j, 비밀번호 cocoindex. |
| OpenAI API | 환경 변수 OPENAI_API_KEY 설정. |
| Google Cloud 서비스 계정 | Drive의 회의록 폴더에 대한 읽기 권한이 있어야 합니다. |
export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/absolute/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2
GOOGLE_DRIVE_ROOT_FOLDER_IDS 변수는 메모가 여러 폴더에 저장된 경우 쉼표로 구분된 목록을 포함할 수 있습니다.
흐름 정의
import os, datetime
import cocoindex
from dataclasses import dataclass
@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(flow_builder: cocoindex.FlowBuilder,
data_scope: cocoindex.DataScope) -> None:
credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.GoogleDrive(
service_account_credential_path=credential_path,
root_folder_ids=root_folder_ids,
recent_changes_poll_interval=datetime.timedelta(seconds=10),
),
refresh_interval=datetime.timedelta(minutes=1),
)
recent_changes_poll_interval 은 Drive를 얼마나 자주 폴링할지 제어하고, refresh_interval 은 전체 흐름 실행 빈도를 결정합니다.
파일을 회의 단위로 분할
with data_scope["documents"].row() as document:
document["meetings"] = document["content"].transform(
cocoindex.functions.SplitBySeparators(
separators_regex=[r"\n\n##?\ "],
keep_separator="RIGHT",
)
)
헤딩(RIGHT)을 유지하면 제목, 날짜 및 LLM 추출에 유용한 다른 단서들을 보존할 수 있습니다.
데이터 스키마
@dataclass
class Person:
name: str
@dataclass
class Task:
description: str
assigned_to: list[Person]
@dataclass
class Meeting:
time: datetime.date
note: str
organizer: Person
participants: list[Person]
tasks: list[Task]
이 데이터 클래스들은 LLM에 제공되어 출력이 이미 기대 구조에 맞게 됩니다.
회의별 추출
with document["meetings"].row() as meeting:
parsed = meeting["parsed"] = meeting["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4o",
),
output_type=Meeting,
)
)
CocoIndex는 추출 결과를 캐시하므로 입력 텍스트, 모델, 스키마가 변경될 때만 LLM이 호출됩니다.
컬렉터(메모리 내 테이블)
meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()
컬렉터에 데이터 채우기
meeting_key = {"note_file": document["filename"], "time": parsed["time"]}
meeting_nodes.collect(**meeting_key, note=parsed["note"])
attended_rels.collect(
id=cocoindex.GeneratedField.UUID,
**meeting_key,
person=parsed["organizer"]["name"],
is_organizer=True,
)
# 유사한 루프(생략)에서는:
# - 각 참가자에 대한 ATTENDED 엣지
# - 각 작업에 대한 DECIDED 엣지 (Meeting → Task)
# - 각 사람에 대한 ASSIGNED_TO 엣지 (Person → Task)
Neo4j로 내보내기
노드
meeting_nodes.export(
"meeting_nodes",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Nodes(label="Meeting"),
),
primary_key_fields=["note_file", "time"],
)
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Person",
primary_key_fields=["name"],
)
)
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Task",
primary_key_fields=["description"],
)
)
관계
attended_rels.export(
"attended_rels",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Relationships(
rel_type="ATTENDED",
source=cocoindex.targets.NodeFromFields(
label="Person",
fields=[cocoindex.targets.TargetFieldMapping(
source="person", target="name"
)],
),
target=cocoindex.targets.NodeFromFields(
label="Meeting",
fields=[
cocoindex.targets.TargetFieldMapping("note_file"),
cocoindex.targets.TargetFieldMapping("time"),
],
),
),
),
primary_key_fields=["id"],
)
DECIDED(Meeting → Task)와 ASSIGNED_TO(Person → Task) 관계도 유사한 정의를 사용하며, 관계 ID가 재실행 시 중복을 방지하도록 합니다.
워크플로우 실행
pip install -e .
cocoindex update main
업데이트가 끝나면 http://localhost:7474 에서 Neo4j Browser를 열고 다음 쿼리를 시도해 보세요:
MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m;
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t;
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t;
CocoIndex는 실제로 변경된 노드와 관계만 수정하므로, 그래프가 원본 회의록과 동기화된 상태를 유지하면서 불필요한 재작성은 발생하지 않습니다.