我如何从会议记录构建一个自更新的 Neo4j Knowledge Graph(可节省 99% 的 LLM 成本)
Source: Dev.to
问题:你的会议记录被浪费了
每天,仅在美国就有 6200‑8000 万场会议。这些会议会产生决策、行动项和任务分配——但大部分情报都死在了 Google Docs 中。
想知道“所有预算会议都有谁参加?”或“本月 Alex 被分配了哪些任务?”那就得在成千上万的 Markdown 文件里苦苦搜索。
真正的杀手锏? 会议记录是活文档。人们会更正姓名、重新分配任务、更新决策。没有增量处理,你只能在以下两者之间做选择:
- 💸 由于重新处理所有内容而产生的大额 LLM 费用
- 📉 过时、陈旧的知识图谱
我通过构建一个仅处理已更改文档的自更新 Neo4j 知识图谱,将 LLM 成本降低了 99%。
我们在构建什么
一个将凌乱的会议记录转化为可查询图数据库的流水线:
Google Drive → Detect Changes → Split Meetings → LLM Extract → Neo4j
结果: 三种节点类型(Meeting、Person、Task)和三种关系(ATTENDED、DECIDED、ASSIGNED_TO),让你可以查询:
- “Sarah 参加了哪些会议?”
- “这个任务是在哪次会议上决定的?”
- “谁拥有所有 Q4 任务?”
秘密武器:增量处理
1. 只处理已更改的内容
Google Drive 源会跟踪最后修改时间戳。当你有 100 000 条会议记录且每天只有 1 % 发生变化时,你只会处理 1 000 个文件——而不是 100 000。
@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(
flow_builder: cocoindex.FlowBuilder,
data_scope: cocoindex.DataScope
) -> None:
credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.GoogleDrive(
service_account_credential_path=credential_path,
root_folder_ids=root_folder_ids,
recent_changes_poll_interval=datetime.timedelta(seconds=10),
),
refresh_interval=datetime.timedelta(minutes=1),
)
影响: 对于典型的 1 % 日常 churn,LLM API 成本降低 99 %。
2. 智能文档拆分
会议文件通常包含多个会话。智能拆分时保留标题(例如 ## Meeting Title)在每个章节中,以便为 LLM 保持上下文。
with data_scope["documents"].row() as document:
document["meetings"] = document["content"].transform(
cocoindex.functions.SplitBySeparators(
separators_regex=[r"\n\n##?\ "],
keep_separator="RIGHT",
)
)
3. 结构化 LLM 抽取
定义具体的模式,而不是让模型返回“某种 JSON”。
@dataclass
class Person:
name: str
@dataclass
class Task:
description: str
assigned_to: list[Person]
@dataclass
class Meeting:
time: datetime.date
note: str
organizer: Person
participants: list[Person]
tasks: list[Task]
使用缓存抽取;相同的输入会复用缓存结果,消除冗余的 LLM 调用。
with document["meetings"].row() as meeting:
parsed = meeting["parsed"] = meeting["text"].transform(
cocoindex.functions.ExtractByLlm(
llm_spec=cocoindex.LlmSpec(
api_type=cocoindex.LlmApiType.OPENAI,
model="gpt-4",
),
output_type=Meeting,
)
)
构建图谱
收集节点和关系
meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()
meeting_key = {"note_file": document["filename"], "time": parsed["time"]}
meeting_nodes.collect(**meeting_key, note=parsed["note"])
attended_rels.collect(
id=cocoindex.GeneratedField.UUID,
**meeting_key,
person=parsed["organizer"]["name"],
is_organizer=True,
)
with parsed["participants"].row() as participant:
attended_rels.collect(
id=cocoindex.GeneratedField.UUID,
**meeting_key,
person=participant["name"],
)
使用 Upsert 逻辑导出到 Neo4j
meeting_nodes.export(
"meeting_nodes",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Nodes(label="Meeting"),
),
primary_key_fields=["note_file", "time"],
)
声明 Person 和 Task 节点:
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Person",
primary_key_fields=["name"],
)
)
flow_builder.declare(
cocoindex.targets.Neo4jDeclaration(
connection=conn_spec,
nodes_label="Task",
primary_key_fields=["description"],
)
)
导出关系:
attended_rels.export(
"attended_rels",
cocoindex.targets.Neo4j(
connection=conn_spec,
mapping=cocoindex.targets.Relationships(
rel_type="ATTENDED",
source=cocoindex.targets.NodeFromFields(
label="Person",
fields=[cocoindex.targets.TargetFieldMapping(
source="person", target="name"
)],
),
target=cocoindex.targets.NodeFromFields(
label="Meeting",
fields=[
cocoindex.targets.TargetFieldMapping("note_file"),
cocoindex.targets.TargetFieldMapping("time"),
],
),
),
),
primary_key_fields=["id"],
)
运行流水线
环境准备
export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2
pip install cocoindex
构建图谱
cocoindex update main
在 Neo4j Browser (http://localhost:7474) 中查询
// 谁参加了哪些会议?
MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m
// 会议中决定的任务
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t
// 人员的任务分配
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t
为什么这很重要
1. 大规模成本节约
- 传统做法: 重新处理 100 000 份文档 → 100 000 次 LLM 调用
- 增量做法: 只处理 1 000 份变更文档 → 1 000 次 LLM 调用
结果:成本降低 99 %。
2. 实时更新
切换到实时模式后,图谱会在会议记录变化时自动更新:
refresh_interval=datetime.timedelta(minutes=1)
3. 数据血缘
CocoIndex 记录每一次转换,能够将任意 Neo4j 节点追溯到 LLM 抽取过程再到源文档。
超越会议记录
该模式同样适用于任何文本密集且文档随时间演进的领域,能够以低成本交付最新的知识图谱。