Mastering LangChain Expression Language (LCEL): Branching, Parallelism, and Streaming
Source: Dev.to
Introduction
Building AI applications often feels like writing “glue code”—endless if/else statements and loops to manage how data flows between your Prompt, LLM, and Output Parser.
LangChain Expression Language (LCEL) solves this by providing a declarative, composable way to build chains—think Unix pipes (|) for AI.
In this demo we use LangChain, Ollama, and the Gemma model to showcase three advanced LCEL capabilities:
- Routing (dynamic branching)
- Parallel execution (fan‑out retrieval)
- Streaming middleware (real‑time token sanitization)
Routing with RunnableBranch
You have a single chatbot but want it to behave differently based on the user’s intent (e.g., code vs. data). Instead of imperative if statements, we build a router chain.
Classify Intent
# A chain that outputs "code", "data", or "general"
classifier_chain = classifier_prompt | llm | parser
Branch to the appropriate sub‑chain
from langchain.schema.runnable import RunnableBranch
routing_chain = RunnableBranch(
(lambda x: x["intent"] == "code", code_chain),
(lambda x: x["intent"] == "data", data_chain),
general_chain, # fallback
)
Example
python main.py routing --query "Write a binary search in Python"
Output
[Router] Detected 'code'
def binary_search(arr, target):
# ... concise, professional code output ...
The system automatically detected the intent and switched to the “Senior Engineer” persona.
Parallel Retrieval with RunnableParallel
When a question requires information from multiple distinct sources (internal wiki, API docs, general notes), querying them sequentially is slow. RunnableParallel runs several retrievers concurrently.
Define Parallel Retrievers
from langchain.schema.runnable import RunnableParallel
parallel_retrievers = RunnableParallel({
"lc_docs": retriever_langchain,
"ollama_docs": retriever_ollama,
"misc_docs": retriever_misc,
})
Example
python main.py parallel_rag --query "What is LCEL?"
Output
The “Merger” step receives results from all three databases instantly, combines them, and the LLM answers using the full context.
Streaming Middleware for Real‑Time Sanitization
You may stream the LLM’s response token‑by‑token to the user, but need to intercept sensitive information (e.g., PII) before it reaches the screen. Wrapping the standard .astream() iterator with an async generator creates a middleware layer that can buffer, sanitize, or log tokens in real time.
Middleware Implementation
async def middleware_stream(iterable):
buffer = ""
async for chunk in iterable:
buffer += chunk
# Simple example: redact any token containing '@'
if "@" in buffer:
yield "[REDACTED_EMAIL]"
buffer = ""
else:
yield buffer
Note: A production implementation would use smarter buffering to handle split tokens.
Example
python main.py stream_middleware --query "My email is test@example.com"
Output
Even though the LLM generated the real email, the middleware caught it on the fly and replaced it before the user saw it.
Takeaways
LCEL is more than syntactic sugar; it provides a powerful framework for building complex, production‑ready AI flows:
- Dynamic Logic – routing based on LLM‑determined intent
- Performance – parallel retrieval of multiple knowledge bases
- Safety – streaming middleware for real‑time content moderation
All of this can be achieved with standard, composable components running locally with Ollama.
GitHub repository: