我把会议记录变成了自更新的 Neo4j 知识图谱

发布: (2025年12月11日 GMT+8 09:11)
6 min read
原文: Dev.to

Source: Dev.to

介绍

每个团队都会生成会议记录,但当笔记只是静态文本时,回答诸如 “谁参加了所有预算会议?”“Alex 本月被分配了哪些任务?” 之类的问题会很困难。
本文演示了一个 CocoIndex 工作流,它将存储在 Google Drive 中的 Markdown 会议记录转化为实时的 Neo4j 知识图谱,并在笔记更改时自动更新。

为什么需要自更新图谱?

  • 大型组织可能拥有成千上万甚至数百万份分布在不同文件夹和工具中的会议记录。
  • 笔记会不断演变——人员变动、任务迁移、决策修订。
  • 传统的关键字搜索把笔记视为静态的,会导致要么昂贵的全量重新计算,要么图谱过时。

CocoIndex 的增量处理只检测到已更改的文档,对这些部分运行 LLM 抽取,并将结果 upsert 到 Neo4j,从而在大规模下保持计算和 LLM 成本低廉。

图模型

节点标签关键字段
Meetingnote_file, time
Personname
Taskdescription
关系类型方向
ATTENDEDPerson → Meeting
DECIDEDMeeting → Task
ASSIGNED_TOPerson → Task

该模型支持以下查询:

  • “Dana 参加了哪些会议?”
  • “这个任务是在哪次会议上决定的?”
  • “谁当前拥有在 Q4 创建的所有任务?”

流程概览

  1. Google Drive(变更追踪) – 检测新文件或已修改文件。
  2. 识别已更改的文档 – 使用服务账号凭证和 last‑modified 时间戳。
  3. 将每个文件拆分为单独的会议 – 基于 Markdown 标题。
  4. LLM 抽取(仅针对已更改的会议) – 返回符合预定义数据类的结构化数据。
  5. 收集节点和关系 – 内存表(collectors)。
  6. 使用 upsert 语义导出到 Neo4j – 防止重复的节点/边。

前置条件

要求细节
Neo4j本地实例(UI 位于 http://localhost:7474),用户 neo4j,密码 cocoindex
OpenAI API在环境变量中设置 OPENAI_API_KEY
Google Cloud 服务账号必须对 Drive 中的会议记录文件夹拥有读取权限。
export OPENAI_API_KEY=sk-...
export GOOGLE_SERVICE_ACCOUNT_CREDENTIAL=/absolute/path/to/service_account.json
export GOOGLE_DRIVE_ROOT_FOLDER_IDS=folderId1,folderId2

如果笔记存放在多个文件夹中,GOOGLE_DRIVE_ROOT_FOLDER_IDS 变量可以使用逗号分隔的列表。

定义工作流

import os, datetime
import cocoindex
from dataclasses import dataclass

@cocoindex.flow_def(name="MeetingNotesGraph")
def meeting_notes_graph_flow(flow_builder: cocoindex.FlowBuilder,
                             data_scope: cocoindex.DataScope) -> None:
    credential_path = os.environ["GOOGLE_SERVICE_ACCOUNT_CREDENTIAL"]
    root_folder_ids = os.environ["GOOGLE_DRIVE_ROOT_FOLDER_IDS"].split(",")

    data_scope["documents"] = flow_builder.add_source(
        cocoindex.sources.GoogleDrive(
            service_account_credential_path=credential_path,
            root_folder_ids=root_folder_ids,
            recent_changes_poll_interval=datetime.timedelta(seconds=10),
        ),
        refresh_interval=datetime.timedelta(minutes=1),
    )

recent_changes_poll_interval 控制 Drive 轮询修改的频率;refresh_interval 决定整体工作流的执行间隔。

将文件拆分为会议

with data_scope["documents"].row() as document:
    document["meetings"] = document["content"].transform(
        cocoindex.functions.SplitBySeparators(
            separators_regex=[r"\n\n##?\ "],
            keep_separator="RIGHT",
        )
    )

保留标题(RIGHT)可以保留标题、日期等对 LLM 抽取有用的线索。

数据模式

@dataclass
class Person:
    name: str

@dataclass
class Task:
    description: str
    assigned_to: list[Person]

@dataclass
class Meeting:
    time: datetime.date
    note: str
    organizer: Person
    participants: list[Person]
    tasks: list[Task]

这些数据类会提供给 LLM,使其输出已经符合预期结构。

对每个会议进行抽取

with document["meetings"].row() as meeting:
    parsed = meeting["parsed"] = meeting["text"].transform(
        cocoindex.functions.ExtractByLlm(
            llm_spec=cocoindex.LlmSpec(
                api_type=cocoindex.LlmApiType.OPENAI,
                model="gpt-4o",
            ),
            output_type=Meeting,
        )
    )

CocoIndex 会缓存抽取结果;只有当输入文本、模型或模式发生变化时才会调用 LLM。

收集器(内存表)

meeting_nodes = data_scope.add_collector()
attended_rels = data_scope.add_collector()
decided_tasks_rels = data_scope.add_collector()
assigned_rels = data_scope.add_collector()

填充收集器

meeting_key = {"note_file": document["filename"], "time": parsed["time"]}

meeting_nodes.collect(**meeting_key, note=parsed["note"])
attended_rels.collect(
    id=cocoindex.GeneratedField.UUID,
    **meeting_key,
    person=parsed["organizer"]["name"],
    is_organizer=True,
)

# 类似的循环(此处省略)会添加:
# - 每个参与者的 ATTENDED 边
# - 每个任务的 DECIDED 边(从会议指向任务)
# - 每个人到其任务的 ASSIGNED_TO 边

导出到 Neo4j

节点

meeting_nodes.export(
    "meeting_nodes",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Nodes(label="Meeting"),
    ),
    primary_key_fields=["note_file", "time"],
)
flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Person",
        primary_key_fields=["name"],
    )
)

flow_builder.declare(
    cocoindex.targets.Neo4jDeclaration(
        connection=conn_spec,
        nodes_label="Task",
        primary_key_fields=["description"],
    )
)

关系

attended_rels.export(
    "attended_rels",
    cocoindex.targets.Neo4j(
        connection=conn_spec,
        mapping=cocoindex.targets.Relationships(
            rel_type="ATTENDED",
            source=cocoindex.targets.NodeFromFields(
                label="Person",
                fields=[cocoindex.targets.TargetFieldMapping(
                    source="person", target="name"
                )],
            ),
            target=cocoindex.targets.NodeFromFields(
                label="Meeting",
                fields=[
                    cocoindex.targets.TargetFieldMapping("note_file"),
                    cocoindex.targets.TargetFieldMapping("time"),
                ],
            ),
        ),
    ),
    primary_key_fields=["id"],
)

DECIDED(Meeting → Task)和 ASSIGNED_TO(Person → Task)的导出使用类似的定义,确保关系 ID 在重新运行时防止重复。

运行工作流

pip install -e .
cocoindex update main

更新完成后,打开 Neo4j Browser(http://localhost:7474),尝试以下查询:

MATCH (p:Person)-[:ATTENDED]->(m:Meeting)
RETURN p, m;
MATCH (m:Meeting)-[:DECIDED]->(t:Task)
RETURN m, t;
MATCH (p:Person)-[:ASSIGNED_TO]->(t:Task)
RETURN p, t;

CocoIndex 只会修改实际发生变化的节点和关系,因此图谱始终与源笔记保持同步,而不会产生不必要的重写。

Back to Blog

相关文章

阅读更多 »