我如何从会议记录构建一个自更新的 Neo4j Knowledge Graph(可节省 99% 的 LLM 成本)

发布: (2025年12月11日 GMT+8 10:15)
5 min read
原文: Dev.to

Source: Dev.to

问题:你的会议记录被浪费了

每天,仅在美国就有 6200‑8000 万场会议。这些会议会产生决策、行动项和任务分配——但大部分情报都死在了 Google Docs 中。

想知道“所有预算会议都有谁参加?”或“本月 Alex 被分配了哪些任务?”那就得在成千上万的 Markdown 文件里苦苦搜索。

真正的杀手锏? 会议记录是活文档。人们会更正姓名、重新分配任务、更新决策。没有增量处理,你只能在以下两者之间做选择:

  • 💸 由于重新处理所有内容而产生的大额 LLM 费用
  • 📉 过时、陈旧的知识图谱

我通过构建一个仅处理已更改文档的自更新 Neo4j 知识图谱,将 LLM 成本降低了 99%。

我们在构建什么

一个将凌乱的会议记录转化为可查询图数据库的流水线:

Google Drive → Detect Changes → Split Meetings → LLM Extract → Neo4j

结果: 三种节点类型(Meeting、Person、Task)和三种关系(ATTENDED、DECIDED、ASSIGNED_TO),让你可以查询:

  • “Sarah 参加了哪些会议?”
  • “这个任务是在哪次会议上决定的?”
  • “谁拥有所有 Q4 任务?”

秘密武器:增量处理

1. 只处理已更改的内容

Google Drive 源会跟踪最后修改时间戳。当你有 100 000 条会议记录且每天只有 1 % 发生变化时,你只会处理 1 000 个文件——而不是 100 000。

@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(
    flow_builder: cocoindex.FlowBuilder,
    data_scope: cocoindex.DataScope
) -> None:
    credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
    root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")

    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.GoogleDrive(
            service_account_credential_path=credential_path,
            root_folder_ids=root_folder_ids,
            recent_changes_poll_interval=datetime.timedelta(seconds=10),
        ),
        refresh_interval=datetime.timedelta(minutes=1),
    )

影响: 对于典型的 1 % 日常 churn,LLM API 成本降低 99 %。

2. 智能文档拆分

会议文件通常包含多个会话。智能拆分时保留标题(例如 ## Meeting Title)在每个章节中,以便为 LLM 保持上下文。

with data_scope["documents"].row() as document:
    document["meetings"] = document["content"].transform(
        cocoindex.functions.SplitBySeparators(
            separators_regex=[r"\n\n##?\ "],
            keep_separator="RIGHT",
        )
    )

3. 结构化 LLM 抽取

定义具体的模式,而不是让模型返回“某种 JSON”。

@dataclass
class Person:
    name: str

@dataclass
class Task:
    description: str
    assigned_to: list[Person]

@dataclass
class Meeting:
    time: datetime.date
    note: str
    organizer: Person
    participants: list[Person]
    tasks: list[Task]

使用缓存抽取;相同的输入会复用缓存结果,消除冗余的 LLM 调用。

with document["meetings"].row() as meeting:
    parsed = meeting["parsed"] = meeting["text"].transform(
        cocoindex.functions.ExtractByLlm(
            llm_spec=cocoindex.LlmSpec(
                api_type=cocoindex.LlmApiType.OPENAI,
                model="gpt-4",
            ),
            output_type=Meeting,
        )
    )

构建图谱

收集节点和关系

meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()

meeting_key = {"note_file": document["filename"], "time": parsed["time"]}

meeting_nodes.collect(**meeting_key, note=parsed["note"])
attended_rels.collect(
    id=cocoindex.GeneratedField.UUID,
    **meeting_key,
    person=parsed["organizer"]["name"],
    is_organizer=True,
)

with parsed["participants"].row() as participant:
    attended_rels.collect(
        id=cocoindex.GeneratedField.UUID,
        **meeting_key,
        person=participant["name"],
    )

使用 Upsert 逻辑导出到 Neo4j

meeting_nodes.export(
    "meeting_nodes",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Nodes(label="Meeting"),
    ),
    primary_key_fields=["note_file", "time"],
)

声明 Person 和 Task 节点:

flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Person",
        primary_key_fields=["name"],
    )
)

flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Task",
        primary_key_fields=["description"],
    )
)

导出关系:

attended_rels.export(
    "attended_rels",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Relationships(
            rel_type="ATTENDED",
            source=cocoindex.targets.NodeFromFields(
                label="Person",
                fields=[cocoindex.targets.TargetFieldMapping(
                    source="person", target="name"
                )],
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Meeting",
                fields=[
                    cocoindex.targets.TargetFieldMapping("note_file"),
                    cocoindex.targets.TargetFieldMapping("time"),
                ],
            ),
        ),
    ),
    primary_key_fields=["id"],
)

运行流水线

环境准备

export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2

pip install cocoindex

构建图谱

cocoindex update main

在 Neo4j Browser (http://localhost:7474) 中查询

// 谁参加了哪些会议?
MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m

// 会议中决定的任务
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t

// 人员的任务分配
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t

为什么这很重要

1. 大规模成本节约

  • 传统做法: 重新处理 100 000 份文档 → 100 000 次 LLM 调用
  • 增量做法: 只处理 1 000 份变更文档 → 1 000 次 LLM 调用

结果:成本降低 99 %。

2. 实时更新

切换到实时模式后,图谱会在会议记录变化时自动更新:

refresh_interval=datetime.timedelta(minutes=1)

3. 数据血缘

CocoIndex 记录每一次转换,能够将任意 Neo4j 节点追溯到 LLM 抽取过程再到源文档。

超越会议记录

该模式同样适用于任何文本密集且文档随时间演进的领域,能够以低成本交付最新的知识图谱。

Back to Blog

相关文章

阅读更多 »

哎呦!2025

我的 YOW! 体验 我已经关注 YOW! 会议超过十年了。它们在澳大利亚的三个城市举办——墨尔本、布里斯班和悉尼——并且 f...