第一篇:基础认知#


📋 前置准备#

环境配置#

在开始学习之前,请确保完成以下环境配置:

1. Python 版本#

python --version  # 需要 Python 3.10 或更高版本

2. 安装依赖#

# 使用 pip 安装最新版本
pip install langchain langchain-openai langgraph langchain-community

# 或使用 uv (推荐)
uv pip install langchain langchain-openai langgraph langchain-community

# 如需指定版本(推荐使用1.0.7或更高版本)
pip install langchain>=1.0.7 langchain-openai>=1.0.3 langgraph>=1.0.3

3. 环境变量配置#

# 创建 .env 文件
OPENAI_API_KEY=sk-your-api-key-here
LANGSMITH_API_KEY=your-langsmith-key  # 可选,用于监控
LANGSMITH_TRACING=true  # 可选

# 在代码中加载
from dotenv import load_dotenv
import os

load_dotenv()

# 验证环境变量
required_vars = ["OPENAI_API_KEY"]
for var in required_vars:
    if not os.getenv(var):
        raise EnvironmentError(f"缺少必需的环境变量: {var}")

4. 依赖版本清单#

# pyproject.toml 推荐配置
[tool.poetry.dependencies]
python = "^3.10"
langchain = "^1.0.7"
langchain-openai = "^1.0.3"
langgraph = "^1.0.3"
langchain-community = "^0.3.0"
langchain-core = "^1.0.7"
langsmith = "^0.4.43"
python-dotenv = "^1.0.0"

# requirements.txt 格式
# langchain>=1.0.7
# langchain-openai>=1.0.3
# langgraph>=1.0.3
# langchain-community>=0.3.0
# langchain-core>=1.0.7
# langsmith>=0.4.43
# python-dotenv>=1.0.0

前置知识#

建议具备以下基础知识:

  • ✅ Python 基础 (async/await、类型注解、装饰器)
  • ✅ LLM 基本概念 (Prompt、Token、Temperature等)
  • ✅ API 调用基础
  • ✅ JSON 数据格式

第1章:LangChain 生态全景#


1.1 架构层次关系#

graph TD
    A[应用层<br>Deep Agents / LangGraph Projects] --> B[编排层<br>LangGraph]
    B --> C[链路层<br>LangChain / LCEL]
    C --> D[监控层<br>LangSmith]
    D --> E[外部资源<br>Models / APIs / Tools]
    style A fill:#C7E8CA,stroke:#6CBF84,stroke-width:1px
    style B fill:#E4F3FF,stroke:#74B3E1,stroke-width:1px
    style C fill:#FFF5D7,stroke:#F0C94E,stroke-width:1px
    style D fill:#FFE4E1,stroke:#E87461,stroke-width:1px

LangChain 生态系统目前已形成“多层协同”的架构体系,既可支持快速原型开发,也可支撑生产级 LLM 应用。整体结构如下:

层级核心组件职责定位典型场景
应用层Deep Agents / LangGraph Projects复杂自治 Agent、长期运行、多 Agent 协作智能助手、自动化任务系统
编排层LangGraph状态化流程控制、节点执行、分支循环多 Agent 编排、可视化状态流
链路层LangChain / LCEL模型调用、提示管理、工具集成RAG、问答、对话
监控层LangSmith调试、观测、评估、成本追踪DevOps、Evals、质量监控

1.1.1 LangChain 与 LangGraph 的关系#

LangChain 专注于 链式逻辑与 Agent 封装;LangGraph 专注于 流程编排与状态管理

  • LangChain: 用于构建单条或线性 chain(Prompt→Model→Tool→Output)。
  • LangGraph: 用于管理含分支、循环、并发的复杂流程(可视化、持久化状态)。
  • 二者可并用:LangGraph 中的节点可运行 LangChain 或 LCEL 构造的 chain。

图 1-2 LangChain 与 LangGraph 协作关系图

graph LR
    A["LangChain Chain\n(Prompt→Model→Tool→Output)"]:::chain --> B["LangGraph Node"]:::node
    B --> C["LangGraph Flow\n(多节点编排 / 状态持久化)"]:::flow
    C --> D["LangGraph Studio\n(可视化与监控)"]:::studio

    classDef chain fill:#FFF5D7,stroke:#F0C94E,stroke-width:1px;
    classDef node fill:#E4F3FF,stroke:#74B3E1,stroke-width:1px;
    classDef flow fill:#C7E8CA,stroke:#6CBF84,stroke-width:1px;
    classDef studio fill:#FFE4E1,stroke:#E87461,stroke-width:1px;

1.1.2 如何构建 Agent#

LangChain 1.0+ 提供统一的 Agent 构建接口:create_agent

快速开始:创建你的第一个 Agent

from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

# 步骤1: 定义工具
@tool
def get_weather(city: str) -> str:
    """获取指定城市的天气"""
    return f"{city}今天天气晴朗,温度25°C"

@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        result = eval(expression)
        return f"计算结果: {result}"
    except Exception as e:
        return f"计算错误: {str(e)}"

# 步骤2: 创建 Agent
agent = create_agent(
    model=ChatOpenAI(model="gpt-4"),
    tools=[get_weather, calculate],
    system_prompt="你是一个有帮助的助手,可以查询天气和进行计算。"
)

# 步骤3: 运行 Agent
result = agent.invoke({
    "messages": [("user", "北京天气如何?另外帮我算一下 25 * 4")]
})

# 查看结果
print(result["messages"][-1].content)

输出示例

北京今天天气晴朗,温度25°C。
25 * 4 的计算结果是 100。

核心概念

create_agent 的工作原理

graph LR
    A[用户输入] --> B[Agent 接收]
    B --> C[LLM 分析]
    C --> D{需要工具?}
    D -- 是 --> E[调用工具]
    E --> F[获取结果]
    F --> C
    D -- 否 --> G[生成回复]
    G --> H[返回用户]

关键参数说明

参数类型必需说明
modelChatModel | str使用的语言模型
toolsList[Tool]可用的工具列表
system_promptstr系统提示词,定义 Agent 行为
checkpointerCheckpointer状态持久化(用于多轮对话)
interrupt_beforeList[str]在指定节点前暂停(需要人工确认)
interrupt_afterList[str]在指定节点后暂停

完整工作流程

  1. 模型绑定:指定使用的 LLM(如 GPT-4、Claude 等)
  2. 工具注册:提供 Agent 可调用的工具集合
  3. 提示配置:通过 system_prompt 定义 Agent 的角色和行为
  4. 决策执行:LLM 基于 ReAct 模式自动决定是否调用工具
  5. 结果返回:自动组合工具输出和 LLM 回复
  6. 监控追踪:集成 LangSmith 实现全链路追踪

关键特性

  • 官方推荐:LangChain 1.0+ 标准 API
  • 简洁易用:统一的接口,3步即可创建 Agent
  • 完整功能:支持 middleware、cache、checkpointer
  • 自动工具调用:LLM 自动判断何时使用哪个工具
  • 多轮对话:支持状态持久化,实现上下文记忆
  • 长期支持:官方维护,持续更新
graph LR
    A[Prompt Template] --> B[LLM / ChatModel]
    B --> C[Tool Selection]
    C --> D[Tool Execution]
    D --> E[Parser / Output Formatter]
    E --> F[返回结果]
    F --> G[LangSmith Callback / Tracing]

1.1.3 LCEL 的定位与作用#

LCEL(LangChain Expression Language)是 LangChain 的“声明式组合语法”,用于 构建可并行、可流式、可追踪的 Runnable 链

  • 核心概念:
    • RunnableSequence 顺序执行;
    • RunnableParallel 并行执行;
    • 支持 async / stream / batch 统一调用;
    • 可直接嵌入 LangGraph 节点。
  • 价值: 在代码层面构建“数据流管线”,如同 Node-RED 或 Airflow 的轻量化实现。
graph TD
    A["输入数据"]:::input --> B["RunnableSequence(顺序执行)"]:::seq
    B --> C["RunnableParallel(并行执行)"]:::par
    C --> D["模型推理 / 工具调用"]:::model
    D --> E["流式输出 / 结构化解析"]:::output

    classDef input fill:#E3F2FD,stroke:#64B5F6,stroke-width:1px;
    classDef seq fill:#FFF9C4,stroke:#FDD835,stroke-width:1px;
    classDef par fill:#DCEDC8,stroke:#81C784,stroke-width:1px;
    classDef model fill:#F0F4C3,stroke:#C0CA33,stroke-width:1px;
    classDef output fill:#FFE0B2,stroke:#FB8C00,stroke-width:1px;

1.1.4 LangSmith 的监控职责#

LangSmith 是 LangChain 官方推出的可观测性与质量评估平台。

主要职责:

  • 🔍 Tracing :追踪 Chain/Graph/Agent 每个调用节点。
  • 📈 Metrics :监控延迟、Token 用量、错误率、成本。
  • 🧪 Evaluation :对模型或 Agent 输出进行打分与对比。
  • ⚙️ Integration :与 LangChain 、LangGraph 、Deep Agents 原生集成。
graph LR
    subgraph LangSmith
    A[Tracing<br>链路追踪] --> B[Metrics<br>性能&成本]
    B --> C[Evals<br>模型评估]
    C --> D[Dashboard / Report]
    end
    D --> E[开发者 / 团队协作]

1.2 核心设计理念#

mindmap
  root((LangChain Design))
    Provider-Agnostic
      多模型兼容
      快速切换
    Runnable Protocol
      统一接口
      可组合执行
    Middleware Driven
      Hook/Callback
      Metrics/Retry
    Production First
      稳定性
      可观测性
      成本控制

1.2.1 Provider-Agnostic 设计#

LangChain 通过统一接口屏蔽 LLM 提供商差异(OpenAI、Anthropic、Cohere、Azure 等),
以 “Provider 无关” 的方式构建应用。

  • 模型切换无需修改上层逻辑。
  • 支持跨平台成本追踪与性能比较。

1.2.2 Runnable Protocol 统一抽象#

Runnable 是 LangChain 的核心执行协议:

一切皆 Runnable。

包括 Chain、Agent、Tool、Prompt 均实现该接口。

  • 统一执行入口:invoke()ainvoke()stream()
  • 支持异步、批量、流式、可追踪调用。
  • 所有 Runnable 可嵌套、组合、装饰。
graph TD
    A[Runnable] --> B[Chain]
    A --> C[Agent]
    A --> D[Tool]
    A --> E[Prompt]
    A --> F[LCEL 组合结构]

1.2.3 Middleware-Driven 架构#

LangChain 支持 Callback / Hook / Tracing 机制,可在执行流中插入中间件。

常见中间件用途:

  • Token 计数与成本监控
  • 日志与错误追踪
  • 安全审查与访问控制
  • 重试与超时控制
sequenceDiagram
    participant U as User
    participant C as Chain/Agent
    participant M as Middleware
    participant L as LangSmith

    U->>C: 调用执行
    C->>M: 进入中间件 (token计数/日志)
    M->>L: 上报监控数据
    L-->>M: 返回监控结果
    M-->>C: 执行主流程
    C-->>U: 返回输出结果

1.2.4 Production-First 理念#

LangChain 1.0 及 LangGraph 1.0 发布后,生态全面转向 生产级稳定性与可观测性
核心目标包括:

  • 长期兼容(向 2.0 平滑过渡)
  • 成本可控(LangSmith 监控 + 自动计费)
  • 模型热替换(Provider-agnostic)
  • 完整 CI/CD 与 Evals 集成
flowchart LR
    A[开发阶段<br>LangChain Prototype] --> B[测试阶段<br>LangSmith 调试]
    B --> C[部署阶段<br>LangGraph 编排]
    C --> D[监控阶段<br>Metrics / Evals]
    D --> E[持续优化<br>模型&提示调整]

1.3 技术选型决策树#

graph TD
    A[应用需求评估] --> B{流程是否复杂?}
    B -- 否 --> C[使用 create_agent<br>LangChain 快速原型]
    B -- 是 --> D{是否需要状态管理?}
    D -- 否 --> E[使用 LCEL 构建 chain]
    D -- 是 --> F{是否为长期运行/自治?}
    F -- 否 --> G[使用 LangGraph 编排]
    F -- 是 --> H[使用 Deep Agents<br>结合 LangSmith 监控]

1.3.1 何时使用 create_agent#

适用场景:

  • 单 Agent 执行,流程线性;
  • 需要快速实现 Tool 调用;
  • 用于 RAG 、问答、助手类场景。

1.3.2 何时深入 LangGraph#

适用场景:

  • 多 Agent 协作;
  • 存在分支 / 循环 / 状态管理;
  • 需可视化、可调试、持久化运行。

1.3.3 何时使用 Deep Agents#

适用场景:

  • 长期运行、自主决策 Agent;
  • 复杂任务拆解、子 Agent 管理;
  • 持续任务执行与周期性触发。

1.3.4 何时需要 Middleware#

适用场景:

  • 生产环境运行;
  • 需要日志、指标、安全控制、回调。
    推荐:所有 Chain/Agent 均启用 LangSmith Tracing + 自定义 Callback。

1.3.5 典型应用场景分析#

场景推荐技术理由
A. 企业文档问答create_agent + LCEL快速构建 RAG 问答
B. 智能客服系统LangChain Agent + Middleware需多轮对话与监控
C. 自动化任务管理LangGraph + Deep Agents + LangSmith复杂 workflow + 自治 agent
D. 内容摘要或转换LCEL轻量、高并行、可流式

本章小结#

LangChain 生态体系可概括为:

链式逻辑(LangChain) → 图式编排(LangGraph) → 监控评估(LangSmith) → 自治进化(Deep Agents)

核心理念:

  • Provider-Agnostic
  • Runnable 统一抽象
  • Middleware 可插架构
  • Production-First 部署思维

设计哲学上,从“玩具原型”走向“生产可观测”的工程系统。

思考与练习#

  1. 练习 1: 选择一个业务场景,画出其 LangChain 技术选型决策路径。

  2. 练习 2: 编写一个 LCEL 例程(Prompt → Model → Parser → Tool),并标注你会插入哪些 Middleware。

  3. 练习 3: 设计一个长期运行 Agent (如 市场监控或自动报告),说明如何用 LangGraph + LangSmith 实现监控与 Evals。

  4. 思考题: LCEL 在 LangGraph 节点中嵌套使用会带来哪些优势与代价?


第2章:核心抽象:Runnable 与 LCEL#

2.1 Runnable Protocol#

2.1.1 为什么需要统一抽象#

在 LangChain 1.0 之前,不同组件(Prompt、Model、Tool、Chain)的调用方式各不相同,导致:

  • 接口不一致:学习成本高,难以组合
  • 缺乏标准化:无法统一追踪、监控
  • 组合困难:不同组件难以嵌套使用

Runnable Protocol 解决方案

LangChain 1.0 引入 Runnable 作为统一执行协议,所有组件均实现该接口:

from langchain_core.runnables import Runnable

# 所有组件均实现 Runnable 接口
class Runnable:
    def invoke(self, input, config=None): ...       # 同步调用
    def ainvoke(self, input, config=None): ...      # 异步调用
    def stream(self, input, config=None): ...       # 流式输出
    def astream(self, input, config=None): ...      # 异步流式
    def batch(self, inputs, config=None): ...       # 批量处理

核心优势

graph LR
    A[Runnable 统一抽象] --> B[一致的调用方式]
    A --> C[可组合性]
    A --> D[可追踪性]
    A --> E[自动优化]

    B --> F[降低学习成本]
    C --> G[LCEL 管道]
    D --> H[LangSmith 集成]
    E --> I[批处理/并行]

实际应用示例

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 所有组件都是 Runnable
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
model = ChatOpenAI()
parser = StrOutputParser()

# 统一的调用方式
result = prompt.invoke({"topic": "AI"})
result = model.invoke("Tell me a joke")
result = parser.invoke("some text")

2.1.2 核心方法:invoke、stream、batch#

invoke() - 同步调用

最基础的调用方式,适用于单次请求:

from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

# 同步调用
response = model.invoke("What is LangChain?")
print(response.content)

执行流程

sequenceDiagram
    participant User
    participant Runnable
    participant LLM
    participant LangSmith

    User->>Runnable: invoke(input)
    Runnable->>LangSmith: 开始追踪
    Runnable->>LLM: 发送请求
    LLM-->>Runnable: 返回完整结果
    Runnable->>LangSmith: 记录结果
    Runnable-->>User: 返回输出

stream() - 流式输出

适用于需要实时反馈的场景(如聊天界面):

# 流式输出
for chunk in model.stream("Tell me a long story"):
    print(chunk.content, end="", flush=True)

流式输出的优势

  • ✅ 降低首字延迟(TTFT - Time To First Token)
  • ✅ 提升用户体验(实时显示)
  • ✅ 减少超时风险
graph LR
    A[User Request] --> B[Stream Token 1]
    B --> C[Stream Token 2]
    C --> D[Stream Token 3]
    D --> E[...]
    E --> F[Stream Complete]

    style B fill:#E8F5E9
    style C fill:#E8F5E9
    style D fill:#E8F5E9

batch() - 批量处理

适用于批量请求场景,自动优化并发:

# 批量处理(自动并发优化)
inputs = [
    "What is AI?",
    "What is ML?",
    "What is LLM?"
]

results = model.batch(inputs)
for result in results:
    print(result.content)

批量处理的优势

  • ✅ 自动并发控制
  • ✅ 成本追踪聚合
  • ✅ 错误处理优化
graph TD
    A[Batch Inputs] --> B[并发控制器]
    B --> C[Request 1]
    B --> D[Request 2]
    B --> E[Request 3]
    C --> F[结果聚合]
    D --> F
    E --> F
    F --> G[Batch Results]

abatch() - 异步批量处理

在需要高并发处理大量请求时,abatch() 比同步 batch() 性能更好:

import asyncio
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

async def async_batch_example():
    inputs = [
        "What is AI?",
        "What is ML?",
        "What is LLM?",
        "What is NLP?",
        "What is DL?"
    ]

    # 异步批量处理
    results = await model.abatch(inputs)

    for i, result in enumerate(results):
        print(f"Result {i+1}: {result.content}")

# 运行异步任务
asyncio.run(async_batch_example())

abatch 与 batch 的对比

方法适用场景优势
batch()中小批量(<50)实现简单,无需async/await
abatch()大批量(50+)、I/O密集更高并发性能,资源利用率高

2.1.3 异步方法:ainvoke、astream#

在高并发场景下,异步方法可显著提升性能:

ainvoke() - 异步调用

import asyncio
from langchain_openai import ChatOpenAI

model = ChatOpenAI()

async def main():
    # 异步调用
    response = await model.ainvoke("What is async programming?")
    print(response.content)

asyncio.run(main())

astream() - 异步流式

async def stream_example():
    async for chunk in model.astream("Tell me a story"):
        print(chunk.content, end="", flush=True)

asyncio.run(stream_example())

并发性能对比

# ❌ 同步方式(串行执行,慢)
def sync_batch():
    results = []
    for query in queries:
        results.append(model.invoke(query))
    return results

# ✅ 异步方式(并发执行,快)
async def async_batch():
    tasks = [model.ainvoke(query) for query in queries]
    return await asyncio.gather(*tasks)

性能对比

请求数同步耗时异步耗时性能提升
1030s5s6x
50150s15s10x
100300s25s12x

2.1.4 Runnable 类型:Lambda、Parallel、Branch、Fallbacks#

RunnableLambda - 自定义函数包装

将普通 Python 函数包装为 Runnable:

from langchain_core.runnables import RunnableLambda

def uppercase(text: str) -> str:
    return text.upper()

# 包装为 Runnable
runnable_upper = RunnableLambda(uppercase)

# 统一调用方式
result = runnable_upper.invoke("hello")  # "HELLO"

RunnableParallel - 并行执行

同时执行多个 Runnable,结果以字典形式返回:

from langchain_core.runnables import RunnableParallel

parallel = RunnableParallel(
    joke=ChatPromptTemplate.from_template("Tell a joke about {topic}") | model,
    poem=ChatPromptTemplate.from_template("Write a poem about {topic}") | model
)

# 并行执行
result = parallel.invoke({"topic": "AI"})
print(result["joke"])
print(result["poem"])
graph TD
    A[Input: topic='AI'] --> B[RunnableParallel]
    B --> C[Joke Generator]
    B --> D[Poem Generator]
    C --> E[Result Dict]
    D --> E
    E --> F[Output]

RunnableBranch - 条件分支

根据条件选择不同的执行路径:

from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
    (lambda x: len(x) > 100, long_text_handler),
    (lambda x: len(x) > 10, medium_text_handler),
    short_text_handler  # 默认分支
)

result = branch.invoke("some text")
graph TD
    A[Input] --> B{len > 100?}
    B -- Yes --> C[Long Text Handler]
    B -- No --> D{len > 10?}
    D -- Yes --> E[Medium Text Handler]
    D -- No --> F[Short Text Handler]

with_fallbacks() - 降级处理

主 Runnable 失败时,自动切换到备用方案:

from langchain_openai import ChatOpenAI

primary_model = ChatOpenAI(model="gpt-4")
fallback_model = ChatOpenAI(model="gpt-3.5-turbo")

# 直接使用 with_fallbacks 方法,无需导入额外类
model_with_fallback = primary_model.with_fallbacks([fallback_model])

# 如果 GPT-4 失败,自动使用 GPT-3.5
result = model_with_fallback.invoke("Hello")

参数说明 (基于官方API文档验证):

必需参数

  • fallbacks: Sequence[Runnable] - 备用 Runnable 序列,按顺序尝试

可选参数 (仅关键字参数):

  • exceptions_to_handle: Tuple[Type[BaseException], ...] - 需要处理的异常类型元组,默认为 (Exception,)
  • exception_key: Optional[str] - 可选的键名,用于将异常信息传递给备用方案。如为 None (默认),异常不传递给备用方案

完整参数示例

# ✅ 示例1: 只对特定异常类型执行降级
model_with_fallback = primary_model.with_fallbacks(
    fallbacks=[fallback_model],  # ✅ 官方标准参数:fallbacks (复数,列表)
    exceptions_to_handle=(TimeoutError, ConnectionError),  # ✅ 官方标准参数
)

# ✅ 示例2: 将异常信息传递给备用方案
from langchain_core.runnables import RunnableLambda

def handle_with_error_context(inputs):
    """备用方案可以访问异常信息"""
    if "error" in inputs:
        print(f"Original error: {inputs['error']}")
    return fallback_model.invoke(inputs["input"])

model_with_error_context = primary_model.with_fallbacks(
    fallbacks=[RunnableLambda(handle_with_error_context)],  # ✅ 使用 fallbacks 参数名
    exception_key="error"  # ✅ 官方标准参数:异常会以 "error" 键传递
)

# ⚠️ 重要:使用 exception_key 时,主 Runnable 和所有备用方案都必须接受字典输入
result = model_with_error_context.invoke({"input": "Hello"})

API 规范总结

def with_fallbacks(
    self,
    fallbacks: Sequence[Runnable[Input, Output]],  # 必需
    *,
    exceptions_to_handle: Tuple[Type[BaseException], ...] = (Exception,),  # 可选
    exception_key: Optional[str] = None  # 可选
) -> RunnableWithFallbacksT[Input, Output]:
    ...
graph LR
    A[Request] --> B[Primary: GPT-4]
    B -- Success --> C[Return Result]
    B -- Failure --> D[Fallback: GPT-3.5]
    D --> C

2.2 LCEL 表达式语言#

2.2.1 声明式组合理念#

LCEL(LangChain Expression Language)是一种声明式语法,用于组合 Runnable 对象。

命令式 vs 声明式

# ❌ 命令式(手动控制流程)
def imperative_chain(input):
    step1_result = prompt.invoke(input)
    step2_result = model.invoke(step1_result)
    step3_result = parser.invoke(step2_result)
    return step3_result

# ✅ 声明式(LCEL 管道)
chain = prompt | model | parser
result = chain.invoke(input)

LCEL 的核心优势

mindmap
  root((LCEL))
    声明式
      代码简洁
      意图清晰
    可组合
      管道连接
      嵌套组合
    自动优化
      并行执行
      流式传输
    可追踪
      LangSmith 集成
      Debug 友好

2.2.2 管道操作符 | 与并行 {}#

管道操作符 | - 顺序执行

将多个 Runnable 串联成管道:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 管道组合
chain = (
    ChatPromptTemplate.from_template("Tell me about {topic}")
    | ChatOpenAI()
    | StrOutputParser()
)

# 自动按顺序执行
result = chain.invoke({"topic": "LangChain"})

执行流程

graph LR
    A[Input] --> B[Prompt Template]
    B --> C[ChatOpenAI]
    C --> D[StrOutputParser]
    D --> E[Output]

    style B fill:#FFF9C4
    style C fill:#E3F2FD
    style D fill:#DCEDC8

并行字典 {} - 并行执行

使用字典语法实现并行执行:

from langchain_core.runnables import RunnablePassthrough

chain = {
    "context": retriever | format_docs,
    "question": RunnablePassthrough()
} | prompt | model

# context 和 question 并行处理
result = chain.invoke("What is LangChain?")

执行流程

graph TD
    A[Input] --> B[RunnableParallel]
    B --> C[context: retriever]
    B --> D[question: passthrough]
    C --> E[Merge Results]
    D --> E
    E --> F[Prompt]
    F --> G[Model]

assign() - 状态更新快捷方式

RunnablePassthrough.assign() 是 LCEL 中最常用的操作之一,用于在链中添加或更新字段:

from langchain_core.runnables import RunnablePassthrough
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 创建向量检索器
vectorstore = Chroma.from_texts(
    ["LangChain是一个AI应用框架", "它支持RAG和Agent"],
    embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

# 使用 assign() 添加检索上下文
chain = (
    RunnablePassthrough.assign(
        context=retriever  # 添加 context 字段
    )
    | ChatPromptTemplate.from_template(
        "基于以下上下文回答问题:\n{context}\n\n问题: {question}"
    )
    | ChatOpenAI()
    | StrOutputParser()
)

# 输入只需要 question,context 会自动添加
result = chain.invoke({"question": "什么是LangChain?"})
# 内部流程: {"question": "..."} -> {"question": "...", "context": [...]}

assign() 的优势

  1. 保留原始输入:不覆盖已有字段
  2. 简化代码:避免手动构造字典
  3. 链式组合:可以多次调用
# 多次 assign 叠加字段
chain = (
    RunnablePassthrough.assign(
        context=retriever  # 添加检索结果
    )
    .assign(
        context_count=lambda x: len(x["context"])  # 添加统计信息
    )
    .assign(
        timestamp=lambda x: "2025-11-17"  # 添加时间戳
    )
    | prompt
    | model
)

# 输入: {"question": "..."}
# 第一步后: {"question": "...", "context": [...]}
# 第二步后: {"question": "...", "context": [...], "context_count": 3}
# 第三步后: {"question": "...", "context": [...], "context_count": 3, "timestamp": "..."}

常见使用场景

# 场景1: RAG 添加检索上下文
rag_chain = (
    RunnablePassthrough.assign(context=retriever)
    | rag_prompt
    | model
)

# 场景2: 添加多个数据源
multi_source_chain = (
    RunnablePassthrough.assign(
        docs=doc_retriever,
        history=history_retriever,
        metadata=metadata_fetcher
    )
    | prompt
    | model
)

# 场景3: 数据转换
transform_chain = (
    RunnablePassthrough.assign(
        upper_text=lambda x: x["text"].upper(),
        word_count=lambda x: len(x["text"].split())
    )
    | processor
)

2.2.3 组合模式:顺序、并行、条件、循环#

顺序链接

# 简单顺序
chain = step1 | step2 | step3

# 复杂顺序
chain = (
    {"input": RunnablePassthrough()}
    | prompt
    | model
    | {"output": parser, "raw": RunnablePassthrough()}
)

并行执行

# 并行获取多个信息
chain = RunnableParallel(
    summary=summarize_chain,
    keywords=extract_keywords_chain,
    sentiment=sentiment_chain
)

条件分支

from langchain_core.runnables import RunnableBranch

# 根据输入长度选择不同处理
chain = RunnableBranch(
    (lambda x: len(x["text"]) > 1000, long_text_chain),
    (lambda x: len(x["text"]) > 100, medium_text_chain),
    short_text_chain
)

循环迭代

# 使用 RunnableLambda 实现循环
def iterative_refine(input):
    result = input
    for _ in range(3):
        result = refine_chain.invoke(result)
    return result

chain = RunnableLambda(iterative_refine)

2.3 高级特性#

2.3.1 Fallback 降级与 Retry 重试#

Fallback - 自动降级

# 多级降级
chain = (
    primary_model
    .with_fallbacks(fallbacks=[backup_model_1, backup_model_2])  # ✅ 使用 fallbacks 参数名
)

# 只对特定异常执行降级
chain = (
    primary_model
    .with_fallbacks(
        fallbacks=[backup_model_1, backup_model_2],  # ✅ 使用 fallbacks 参数名
        exceptions_to_handle=(TimeoutError, ConnectionError)  # ✅ 官方标准参数
    )
)

降级流程

graph TD
    A[Request] --> B[Primary Model]
    B -- Success --> Z[Return]
    B -- Failure --> C[Backup Model 1]
    C -- Success --> Z
    C -- Failure --> D[Backup Model 2]
    D --> Z

Retry - 自动重试

# 直接使用 with_retry 方法,无需单独导入
chain = (
    prompt | model | parser
).with_retry(
    stop_after_attempt=3,  # 最大重试次数
    wait_exponential_jitter=True,  # 指数退避 + 随机抖动
    retry_if_exception_type=(Exception,)  # 指定需要重试的异常类型
)

参数说明(基于官方API文档验证):

  • stop_after_attempt:最大重试次数,默认为 3
  • wait_exponential_jitter:是否使用指数退避 + 随机抖动,默认为 True
  • retry_if_exception_type:需要重试的异常类型元组,默认为 (Exception,)

重试策略示例

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

# 只对特定异常重试
chain = (prompt | model | parser).with_retry(
    retry_if_exception_type=(TimeoutError, ConnectionError),
    stop_after_attempt=5,
    wait_exponential_jitter=True
)

# 禁用指数退避(立即重试)
chain = (prompt | model | parser).with_retry(
    stop_after_attempt=3,
    wait_exponential_jitter=False  # 禁用指数退避,立即重试
)

重试行为

  • 指数退避:1s → 2s → 4s → 8s
  • 最大重试次数:可自定义(默认3次)
  • 重试条件:可指定异常类型(默认所有 Exception)

2.3.2 Timeout 超时控制#

重要: RunnableConfig 不支持 timeout 参数。超时控制应在模型层面配置。

from langchain_openai import ChatOpenAI

# ✅ 正确:在模型构造时设置timeout
model = ChatOpenAI(
    model="gpt-4",
    timeout=30,  # 30秒超时
    max_retries=2
)

chain = prompt | model | parser
result = chain.invoke(input)

超时 + 降级组合策略

from langchain_openai import ChatOpenAI

# 主模型:严格超时
slow_model = ChatOpenAI(model="gpt-4", timeout=10)

# 降级模型:快速响应
fast_model = ChatOpenAI(model="gpt-3.5-turbo", timeout=5)

# 组合超时 + 降级
chain = (prompt | slow_model | parser).with_fallbacks([
    prompt | fast_model | parser
])

使用 RunnableConfig 配置其他参数

from langchain_core.runnables import RunnableConfig

# RunnableConfig支持的参数
result = chain.invoke(
    input,
    config=RunnableConfig(
        max_concurrency=5,      # 最大并发数
        tags=["production"],     # 标签(用于监控)
        metadata={"user": "alice"}  # 元数据
    )
)

2.3.3 缓存与性能优化#

💡 提示: 本节介绍 Runnable Protocol 的基础性能API。生产环境的深度性能调优、成本控制、缓存架构等内容,详见 第八篇《生产实践》第21章

LLM 缓存

from langchain_core.caches import InMemoryCache
from langchain_core.globals import set_llm_cache

# 启用缓存
set_llm_cache(InMemoryCache())

# 相同请求直接返回缓存结果
model.invoke("What is AI?")  # 调用 LLM
model.invoke("What is AI?")  # 返回缓存(不调用 LLM)

批处理优化

# 批处理优化(使用max_concurrency控制并发)
chain = prompt | model.with_config({"max_concurrency": 10})

# 内部自动合并请求
results = chain.batch(inputs)

流式优化

# 流式传输减少延迟
for chunk in chain.stream(input):
    print(chunk, end="")

性能对比

特性普通调用优化后性能提升
缓存2s50ms40x
批处理10s2s5x
流式TTFT 2sTTFT 200ms10x

本章小结#

Runnable Protocol 核心要点

  • ✅ 统一接口:invoke、stream、batch、ainvoke、astream
  • ✅ 可组合性:Lambda、Parallel、Branch、Fallbacks
  • ✅ 可追踪性:自动集成 LangSmith
  • ✅ 性能优化:异步、批处理、缓存

LCEL 核心要点

  • ✅ 声明式组合:| 管道、{} 并行
  • ✅ 自动优化:并行执行、流式传输
  • ✅ 高级特性:Fallback、Retry、Timeout、Cache

设计哲学

一切皆 Runnable,所有组件统一接口,声明式组合,自动优化执行。


思考与练习#

  1. 练习 1:基础管道 构建一个 LCEL 管道:Prompt → Model → Parser,实现一个简单的问答系统。

  2. 练习 2:并行处理 使用 RunnableParallel 同时生成一个笑话、一首诗和一个故事,输入主题为"AI"。

  3. 练习 3:错误处理 实现一个带有 Fallback 和 Retry 的 chain,主模型失败时自动切换到备用模型。

  4. 练习 4:性能优化 对比同步批处理和异步批处理的性能差异(10个请求)。

  5. 思考题:

    • 什么场景下应该使用 stream 而不是 invoke?
    • RunnableBranch 和简单的 if-else 有什么区别?
    • 如何在 LCEL 中实现循环逻辑?
[统计组件仅在生产环境显示]