精通 LangChain Expression Language (LCEL):分支、并行和流式
Source: Dev.to
介绍
构建 AI 应用时常常感觉像在写“胶水代码”——无尽的 if/else 语句和循环,用来管理 Prompt、LLM 与 Output Parser 之间的数据流动。
LangChain Expression Language(LCEL)通过提供一种声明式、可组合的方式来构建链路,解决了这个问题——可以把它想象成 AI 版的 Unix 管道 (|)。
在本示例中,我们使用 LangChain、Ollama 和 Gemma 模型,展示 LCEL 的三项高级功能:
- 路由(动态分支)
- 并行执行(多路检索)
- 流式中间件(实时 token 消毒)
使用 RunnableBranch 实现路由
你只有一个聊天机器人,但希望它根据用户意图(例如代码查询 vs. 数据查询)表现不同。我们不使用命令式的 if 语句,而是构建一个路由链。
意图分类
# A chain that outputs "code", "data", or "general"
classifier_chain = classifier_prompt | llm | parser
根据意图分支到相应子链
from langchain.schema.runnable import RunnableBranch
routing_chain = RunnableBranch(
(lambda x: x["intent"] == "code", code_chain),
(lambda x: x["intent"] == "data", data_chain),
general_chain, # fallback
)
示例
python main.py routing --query "Write a binary search in Python"
输出
[Router] Detected 'code'
def binary_search(arr, target):
# ... concise, professional code output ...
系统自动检测到意图,并切换到 “Senior Engineer” 人格。
使用 RunnableParallel 实现并行检索
当一个问题需要从多个不同来源(内部 wiki、API 文档、通用笔记)获取信息时,顺序查询会很慢。RunnableParallel 可以并发运行多个检索器。
定义并行检索器
from langchain.schema.runnable import RunnableParallel
parallel_retrievers = RunnableParallel({
"lc_docs": retriever_langchain,
"ollama_docs": retriever_ollama,
"misc_docs": retriever_misc,
})
示例
python main.py parallel_rag --query "What is LCEL?"
输出
“Merger” 步骤会立即收到来自三个数据库的结果,将它们合并后,LLM 使用完整上下文进行回答。
实时消毒的流式中间件
你可能会把 LLM 的响应逐 token 流式传输给用户,但需要在信息到达屏幕前拦截敏感内容(例如 PII)。将标准的 .astream() 迭代器包装在异步生成器中,即可创建一个中间件层,实时缓冲、消毒或记录 token。
中间件实现
async def middleware_stream(iterable):
buffer = ""
async for chunk in iterable:
buffer += chunk
# Simple example: redact any token containing '@'
if "@" in buffer:
yield "[REDACTED_EMAIL]"
buffer = ""
else:
yield buffer
注意:在生产环境中应使用更智能的缓冲策略来处理被拆分的 token。
示例
python main.py stream_middleware --query "My email is test@example.com"
输出
即使 LLM 生成了真实的电子邮件地址,中间件也能在飞行中捕获并在用户看到之前将其替换。
关键要点
LCEL 不仅是语法糖;它提供了一个强大的框架,用于构建复杂、可投入生产的 AI 流程:
- 动态逻辑 – 基于 LLM 判断的意图进行路由
- 性能提升 – 并行检索多个知识库
- 安全保障 – 实时内容审查的流式中间件
所有这些都可以使用标准、可组合的组件,在本地通过 Ollama 运行实现。
GitHub 仓库: