第十篇 生产实践与监控评估#
目标: 构建生产级LLM应用的完整体系
从监控追踪到架构设计,从性能优化到安全防护,从部署运维到故障排查,全面掌握生产环境的关键要素。
第1章:LangSmith Tracing 与 Evaluation#
关注点:掌握 Agent 执行的全链路可观测性,建立科学的评估框架。
1.1 追踪体系#
1.1.1 追踪原理与数据模型#
什么是追踪(Tracing)?
追踪是记录和分析 Agent 执行过程的完整链路,从用户输入开始,记录每一个中间步骤(模型调用、工具执行、状态变化),最终得到输出。LangSmith 追踪形成一棵执行树:
root_run (Agent 执行)
├── before_model_hook (Middleware)
├── model_call (模型调用)
│ ├── system_prompt
│ ├── messages
│ └── tools
├── tool_run (工具执行)
│ ├── search_tool
│ └── get_weather_tool
└── after_model_hook (后处理)追踪的核心作用:
- 调试:看到完整的执行链,快速定位问题
- 监控:追踪延迟、Token 成本、错误率等指标
- 优化:识别瓶颈,比较不同版本的性能差异
- 审计:记录谁做了什么,满足合规要求
数据模型:
class Run:
id: str # 唯一 ID
name: str # 运行名称
run_type: str # "agent", "model", "tool", "chain" 等
parent_run_id: Optional[str] # 父 Run ID(形成树关系)
# 输入输出
inputs: dict[str, Any] # 输入参数
outputs: dict[str, Any] # 输出结果
# 时间和成本
start_time: datetime # 开始时间
end_time: datetime # 结束时间
duration: float # 执行耗时(秒)
# Token 和成本
token_usage: Optional[TokenUsage]
cost: Optional[float] # 美元成本
# 状态和错误
status: str # "success", "error"
error: Optional[str] # 错误信息
# 元数据
metadata: dict[str, Any] # 自定义元数据
tags: list[str] # 标签(用于筛选)
# 反馈
feedback_records: list[Feedback] # 用户反馈1.1.2 自动追踪:环境变量配置#
最简单的开启方式:
# 设置环境变量(LangSmith 1.0+ 最新命名)
export LANGSMITH_API_KEY="lsv2_..."
export LANGCHAIN_PROJECT="my_project"
export LANGSMITH_TRACING="true"向后兼容说明:
- 旧版环境变量
LANGCHAIN_API_KEY和LANGCHAIN_TRACING_V2仍然支持,但建议迁移到新命名- API Key 格式从
sk-...变更为lsv2_...(LangSmith v2)LANGCHAIN_PROJECT保持不变(生态通用变量)
Python 代码中配置:
import os
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
# 配置 LangSmith(最新命名)
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGCHAIN_PROJECT"] = "my_project"
os.environ["LANGSMITH_TRACING"] = "true"
# 创建 Agent
model = ChatOpenAI(model="gpt-4o")
agent = create_agent(
model=model,
tools=[search, weather],
system_prompt="你是一个助手"
)
# ✅ 自动追踪:所有调用都会被记录到 LangSmith
result = agent.invoke({"messages": [("user", "今天天气如何")]})
# 查看追踪:
# 1. 打开 https://smith.langchain.com
# 2. 选择项目 "my_project"
# 3. 查看实时追踪树环境变量选项:
| 变量 | 说明 | 示例 | 状态 |
|---|---|---|---|
LANGSMITH_API_KEY | LangSmith API 密钥(必需) | lsv2_... | ✅ 推荐 |
LANGCHAIN_PROJECT | 项目名称(用于分组) | my_agent | ✅ 通用 |
LANGSMITH_TRACING | 启用追踪(必需) | true | ✅ 推荐 |
LANGSMITH_WORKSPACE_ID | 工作区 ID(团队使用) | ws-... | ✅ 新增 |
LANGCHAIN_ENDPOINT | LangSmith API 端点 | https://api.smith.langchain.com | ✅ 可选 |
LANGCHAIN_SESSION | 会话名称(可选,用于子分组) | session-123 | ✅ 可选 |
LANGCHAIN_CALLBACKS_BACKGROUND | 后台异步发送追踪数据 | true | ✅ 可选 |
LANGCHAIN_TRACING_SAMPLING_RATE | 采样率(0.0-1.0,用于生产环境) | 0.1 | ✅ 可选 |
LANGSMITH_TEST_CACHE | 缓存 API 调用(加速评估) | true | ✅ 可选 |
LANGCHAIN_API_KEY | (旧)API 密钥 | sk-... | ⚠️ 已弃用 |
LANGCHAIN_TRACING_V2 | (旧)启用追踪 | true | ⚠️ 已弃用 |
完整环境变量配置示例:
import os
# 基础配置(必需)- LangSmith 1.0+ 最新命名
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my_project"
# 团队协作配置(可选)
os.environ["LANGSMITH_WORKSPACE_ID"] = "ws-..." # 工作区 ID(团队共享)
# 可选配置
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" # 自定义端点
os.environ["LANGCHAIN_SESSION"] = "user-session-123" # 会话分组
os.environ["LANGCHAIN_CALLBACKS_BACKGROUND"] = "true" # 后台发送,不阻塞主程序
os.environ["LANGCHAIN_TRACING_SAMPLING_RATE"] = "0.1" # 生产环境采样 10%
# 开发/测试环境配置
os.environ["LANGSMITH_TEST_CACHE"] = "true" # 缓存 API 调用以加速测试
# 向后兼容(不推荐,仍然支持)
# os.environ["LANGCHAIN_API_KEY"] = "sk-..." # 已弃用,使用 LANGSMITH_API_KEY
# os.environ["LANGCHAIN_TRACING_V2"] = "true" # 已弃用,使用 LANGSMITH_TRACING环境变量说明:
必需变量(LangSmith 1.0+):
LANGSMITH_API_KEY:从 https://smith.langchain.com 获取,格式为lsv2_...LANGSMITH_TRACING:必须设为"true"启用追踪
项目组织:
LANGCHAIN_PROJECT:项目名称,用于分组追踪(生态通用变量,保持不变)LANGCHAIN_SESSION:会话名称,用于更细粒度的分组(如按用户或任务分组)LANGSMITH_WORKSPACE_ID:工作区 ID,用于团队协作(格式为ws-...)
性能优化:
LANGCHAIN_CALLBACKS_BACKGROUND:设为"true"后,追踪数据异步发送,不阻塞主程序LANGCHAIN_TRACING_SAMPLING_RATE:生产环境建议设置采样率(如"0.1"表示 10%),降低追踪成本
测试加速:
LANGSMITH_TEST_CACHE:在测试/评估时缓存 API 响应,避免重复调用
向后兼容(已弃用):
LANGCHAIN_API_KEY(旧格式sk-...)→ 迁移到LANGSMITH_API_KEY(新格式lsv2_...)LANGCHAIN_TRACING_V2→ 迁移到LANGSMITH_TRACING
1.1.3 手动追踪:@traceable 装饰器#
场景:追踪非 LangChain 代码(自定义函数、数据库操作等)。
基础用法:
from langsmith.run_helpers import traceable
@traceable(name="my_function", run_type="chain")
def my_custom_function(input_text: str) -> str:
"""自定义函数会自动被追踪"""
# 处理输入
result = input_text.upper()
return result
# 调用时自动记录到 LangSmith
output = my_custom_function("hello")完整参数说明:
from langsmith import traceable
@traceable(
# 基础参数
name="custom_processing", # 运行名称(默认:函数名)
run_type="tool", # 运行类型:llm/chain/tool/retriever/prompt(默认:chain)
# 组织和标记
metadata={ # 元数据(任意键值对)
"version": "1.0",
"author": "alice",
"module": "data_processing"
},
tags=["production", "v1"], # 标签列表(用于筛选)
project_name="my_project", # 项目名称(覆盖环境变量)
# 高级参数
reduce_fn=None, # 聚合函数(用于生成器/流式输出)
client=None, # 自定义 LangSmith Client
process_inputs=None, # 输入预处理函数
process_outputs=None, # 输出后处理函数
)
def process_data(data: dict) -> dict:
"""处理数据"""
result = {**data, "processed": True}
return result参数详细说明:
| 参数 | 类型 | 说明 | 默认值 |
|---|---|---|---|
name | str | 追踪运行的显示名称 | 函数名 |
run_type | str | 运行类型(llm/chain/tool/retriever/prompt) | "chain" |
metadata | dict | 自定义元数据(键值对) | None |
tags | list[str] | 标签列表,用于过滤和分组 | None |
project_name | str | 项目名称(覆盖 LANGCHAIN_PROJECT 环境变量) | None |
reduce_fn | Callable | 聚合函数,用于处理生成器/流式输出 | None |
client | Client | 自定义 LangSmith Client 实例 | None |
process_inputs | Callable | 输入序列化函数(用于自定义输入格式) | None |
process_outputs | Callable | 输出序列化函数(用于自定义输出格式) | None |
run_type 类型说明:
"llm":LLM 模型调用"chain":链式调用(默认)"tool":工具执行"retriever":检索操作"prompt":提示词模板- 也可以使用自定义类型(如
"database","api_call")
高级用法示例:
# 1. 处理生成器输出
@traceable(
name="stream_processor",
reduce_fn=lambda outputs: "".join(outputs) # 将流式输出合并为字符串
)
def stream_data(n: int):
"""生成器函数"""
for i in range(n):
yield f"chunk-{i}"
# 2. 自定义输入/输出处理
def serialize_inputs(inputs):
"""自定义输入序列化"""
return {k: str(v)[:100] for k, v in inputs.items()} # 截断长文本
@traceable(
process_inputs=serialize_inputs,
process_outputs=lambda o: {"result": str(o)[:200]}
)
def process_large_data(data: dict) -> dict:
# 处理大量数据,但只记录摘要
return {"result": data}嵌套追踪(自动形成树关系):
@traceable(name="parent_task")
def parent_task(query: str) -> str:
# 调用子任务
result1 = search_task(query)
result2 = analyze_task(result1)
return result2
@traceable(name="search_task")
def search_task(query: str) -> str:
# 子任务 1
return f"搜索结果: {query}"
@traceable(name="analyze_task")
def analyze_task(text: str) -> str:
# 子任务 2
return f"分析: {text}"
# 调用时自动形成树:
# parent_task
# ├── search_task
# └── analyze_task
parent_task("LangChain")访问当前运行信息:
from langsmith.run_helpers import traceable, get_current_run_tree
@traceable
def my_function(x: int) -> int:
# 获取当前运行
run = get_current_run_tree()
if run:
print(f"当前运行 ID: {run.id}")
print(f"运行名称: {run.name}")
# 添加自定义元数据
run.metadata = {"user_id": "user-123"}
return x * 2异步追踪示例:
@traceable 装饰器完全支持异步函数,自动追踪异步操作:
import asyncio
from langsmith import traceable
import httpx
@traceable(name="fetch_data", run_type="retriever")
async def fetch_user_data(user_id: str) -> dict:
"""异步获取用户数据"""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
@traceable(name="process_user", run_type="chain")
async def process_user(user_id: str) -> dict:
"""异步处理用户信息"""
# 异步调用会自动追踪
user_data = await fetch_user_data(user_id)
# 处理数据
processed = {
"id": user_data["id"],
"name": user_data["name"].upper(),
"processed_at": "2025-01-15"
}
return processed
# 使用异步追踪
async def main():
result = await process_user("user-123")
print(result)
# 执行
asyncio.run(main())
# 追踪树会显示:
# process_user (async)
# └── fetch_user_data (async)异步批量操作追踪:
@traceable(name="batch_fetch", run_type="chain")
async def fetch_multiple_users(user_ids: list[str]) -> list[dict]:
"""并发获取多个用户数据"""
# 所有异步调用都会被追踪
tasks = [fetch_user_data(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
# 使用
async def main():
users = await fetch_multiple_users(["user-1", "user-2", "user-3"])
# 追踪树会显示:
# batch_fetch
# ├── fetch_user_data (user-1)
# ├── fetch_user_data (user-2)
# └── fetch_user_data (user-3)错误处理与追踪:
异常会自动记录到 LangSmith,无需额外配置:
@traceable(name="safe_operation", run_type="tool")
def safe_operation(value: int) -> int:
"""带错误处理的操作"""
try:
if value < 0:
raise ValueError("Value must be non-negative")
result = 100 / value
return result
except ValueError as e:
# 异常会自动记录到 LangSmith
print(f"Validation error: {e}")
raise # 重新抛出以标记为失败
except ZeroDivisionError as e:
# 除零错误也会被记录
print(f"Division error: {e}")
raise
# 调用失败的函数
try:
safe_operation(0) # 会在 LangSmith 中标记为 error
except ZeroDivisionError:
pass
# LangSmith 中会显示:
# - 运行状态: error
# - 错误类型: ZeroDivisionError
# - 错误消息: division by zero
# - 完整堆栈跟踪异步错误处理示例:
@traceable(name="async_safe_fetch")
async def async_safe_fetch(url: str) -> dict:
"""带错误处理的异步请求"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
except httpx.TimeoutException as e:
# 超时错误会被记录
print(f"Request timeout: {e}")
raise
except httpx.HTTPStatusError as e:
# HTTP 错误会被记录
print(f"HTTP error: {e.response.status_code}")
raise
except Exception as e:
# 所有其他错误也会被记录
print(f"Unexpected error: {e}")
raise
# 使用
async def main():
try:
data = await async_safe_fetch("https://invalid-url.example.com")
except Exception:
# LangSmith 会记录完整的错误信息
pass1.1.4 追踪信息分析#
在 LangSmith UI 中查看追踪:
Trace 树视图:查看完整的执行链路
- 每个节点显示运行名称、状态、耗时
- 点击节点查看详细信息(输入、输出、错误)
输入输出对比:
- 左侧:输入参数
- 右侧:输出结果
- 快速定位数据变化
Token 成本分析:
- 显示每次模型调用的 Token 使用量
- 计算累计成本(美元)
- 识别成本最高的操作
延迟分析:
- 显示每个操作的耗时
- 识别性能瓶颈
- 比较不同版本的性能
编程方式分析追踪:
from langsmith import Client
client = Client()
# 获取最近的追踪
runs = client.list_runs(
project_name="my_project",
limit=10 # 获取最近 10 条
)
for run in runs:
print(f"运行: {run.name}")
print(f"状态: {run.status}")
print(f"耗时: {run.end_time - run.start_time}")
if run.token_usage:
print(f"Tokens: {run.token_usage.completion_tokens}")
print(f"成本: ${run.cost}")
# 获取子运行
if run.child_runs:
for child in run.child_runs:
print(f" └─ {child.name}: {child.status}")筛选和统计:
from datetime import datetime, timedelta
# 筛选条件
yesterday = datetime.now() - timedelta(days=1)
# 获取特定标签的运行
production_runs = client.list_runs(
project_name="my_project",
filter='tags:production',
start_time=yesterday
)
# 统计
total_cost = sum(run.cost for run in production_runs if run.cost)
avg_duration = sum(
(run.end_time - run.start_time).total_seconds()
for run in production_runs
) / len(production_runs)
error_count = sum(1 for run in production_runs if run.status == "error")
print(f"总成本: ${total_cost:.2f}")
print(f"平均耗时: {avg_duration:.2f} 秒")
print(f"错误数: {error_count}")反馈收集:
# 在应用中收集用户反馈
run_id = "run-123" # 从追踪中获取
# 用户点赞
client.create_feedback(
run_id=run_id,
key="user_rating",
score=1.0, # 1.0 = 好, 0.0 = 不好
comment="很有用!"
)
# 专家标注
client.create_feedback(
run_id=run_id,
key="expert_evaluation",
score=0.8,
comment="大部分正确,但有一处错误"
)
# 查看反馈
feedbacks = client.list_feedbacks(run_id=run_id)
for feedback in feedbacks:
print(f"{feedback.key}: {feedback.score} - {feedback.comment}")1.2 Evaluation 评估框架#
1.2.1 Dataset 管理与测试集设计#
什么是 Dataset?
Dataset 是评估所需的测试数据集合,每个 Example 包含:
- input:输入参数
- output(可选):参考输出(用于对比)
- metadata(可选):补充信息
创建 Dataset:
from langsmith import Client
client = Client()
# 方法 1:手动创建
dataset = client.create_dataset(
dataset_name="qa_benchmark",
description="QA 任务基准数据集"
)
# 添加示例
client.create_example(
dataset_id=dataset.id,
inputs={"question": "LangChain 是什么?"},
outputs={"answer": "LangChain 是一个构建 LLM 应用的框架"}
)
client.create_example(
dataset_id=dataset.id,
inputs={"question": "Python 3.13 的主要特性?"},
outputs={"answer": "Python 3.13 引入了 JIT 编译器..."}
)
# 方法 2:从 CSV 导入
import pandas as pd
df = pd.read_csv("qa_examples.csv")
# 假设 CSV 有 "question" 和 "answer" 列
dataset = client.create_dataset(
dataset_name="qa_from_csv",
description="从 CSV 导入的 QA 数据集"
)
for _, row in df.iterrows():
client.create_example(
dataset_id=dataset.id,
inputs={"question": row["question"]},
outputs={"answer": row["answer"]}
)
# 方法 3:从生产追踪创建(最推荐)
# 在生产环境运行一段时间后,筛选高质量的追踪
client.create_dataset(
dataset_name="production_examples",
description="从生产环境采集的真实用户查询"
)
# 手动筛选好的追踪并添加
# (通常通过 LangSmith UI 完成,支持批量导入)测试集设计最佳实践:
1. 覆盖代表场景
# ✅ 好:覆盖多种场景
examples = [
# 简单查询
{"question": "天气如何?", "answer": "..."},
# 复杂查询
{"question": "比较 LangChain 和 Langraph 的优缺点", "answer": "..."},
# 边界情况
{"question": "", "answer": "请输入有效问题"},
{"question": "???.|||", "answer": "请输入有效问题"},
# 多轮对话
{"question": "什么是 RAG?", "answer": "..."},
{"question": "能给个例子吗?", "answer": "..."},
]2. 足够的数据量
- 初期:10-20 个高质量样例(手工精选)
- 中期:50-100 个样例(包括覆盖各种场景)
- 成熟期:1000+ 个样例(从生产环境采集)
3. 包含元数据
client.create_example(
dataset_id=dataset.id,
inputs={"question": "LangChain 是什么?"},
outputs={"answer": "LangChain 是..."},
metadata={
"difficulty": "easy",
"category": "framework",
"source": "user_feedback",
"language": "chinese"
}
)1.2.2 评估指标:准确率、相关性、一致性、延迟、成本#
评估器返回格式详解:
评估器必须返回一个字典,包含以下字段:
| 字段 | 必需 | 类型 | 说明 |
|---|---|---|---|
key | 是 | str | 评估指标的名称(如 "accuracy", "relevance") |
score | 是 | float/int/bool | 评估分数(通常 0-1,也可以是任意数值或布尔值) |
value | 否 | Any | 实际值(通常与 score 相同,用于记录原始值) |
comment | 否 | str | 评论或解释(用于调试和分析) |
correction | 否 | dict | 修正建议(用于标注正确答案) |
完整返回格式示例:
# 最简格式(只包含必需字段)
return {
"key": "accuracy",
"score": 1.0
}
# 完整格式(包含所有字段)
return {
"key": "accuracy", # 指标名称
"score": 0.85, # 分数(0-1 或任意数值)
"value": 0.85, # 实际值(可选,通常与 score 相同)
"comment": "Good answer!", # 评论(可选,用于解释)
"correction": { # 修正建议(可选)
"expected": "Paris",
"actual": "paris"
}
}
# 布尔分数
return {
"key": "is_correct",
"score": True # 布尔值也可以
}
# 多个评估结果(返回列表)
return [
{"key": "accuracy", "score": 0.9},
{"key": "latency", "score": 1.5, "comment": "1.5 seconds"}
]评估器类型:
- 启发式评估器(Heuristic Evaluator):确定性规则
def is_valid_code(run, example):
"""检查输出是否是有效的 Python 代码"""
code = run.outputs.get("code", "")
try:
compile(code, "<string>", "exec")
return {
"key": "valid_code",
"score": 1.0,
"comment": "Valid Python code"
}
except SyntaxError as e:
return {
"key": "valid_code",
"score": 0.0,
"comment": f"Syntax error: {str(e)}",
"correction": {"error": str(e)}
}
def exact_match(run, example):
"""精确匹配评估器"""
actual = run.outputs.get("answer")
expected = example.outputs.get("answer")
is_match = actual == expected
return {
"key": "exact_match",
"score": 1.0 if is_match else 0.0,
"value": is_match,
"comment": "Match!" if is_match else f"Expected: {expected}, Got: {actual}",
"correction": {"expected": expected} if not is_match else None
}
def length_check(run, example):
"""检查输出长度"""
answer = run.outputs.get("answer", "")
length = len(answer)
return {
"key": "answer_length",
"score": length, # 分数可以是任意数值
"value": length,
"comment": f"Answer has {length} characters"
}2. LLM 作为评估器(LLM-as-Judge):使用 LLM 打分
from langchain_openai import ChatOpenAI
from langsmith.evaluation import LangChainStringEvaluator
# 使用官方提供的 LLM 评估器
from langsmith.evaluation import (
EvaluateStrings,
evaluate_strings
)
# 自定义 LLM 评估器
def llm_evaluator(run, example):
"""使用 LLM 评估相关性"""
model = ChatOpenAI(model="gpt-4o-mini")
evaluation_prompt = f"""
问题:{example.inputs.get("question")}
参考答案:{example.outputs.get("answer")}
实际答案:{run.outputs.get("answer")}
请评估实际答案与参考答案的相似度(0-1)。
"""
response = model.invoke(evaluation_prompt)
# 解析响应
score = float(response.content.split("\n")[0])
return {
"score": score,
"key": "relevance",
"comment": response.content
}3. 人工评估:通过 UI 手动标注
# LangSmith UI 中可以:
# 1. 在 Annotation Queue 中标注
# 2. 为每条结果打分或写评论
# 3. 导出评估结果供统计常见评估指标:
| 指标 | 实现方式 | 适用场景 |
|---|---|---|
| Accuracy | 精确匹配或 LLM | 分类任务 |
| BLEU/ROUGE | 文本相似度 | 文本生成 |
| Precision/Recall | 集合操作 | 检索任务 |
| Relevance | LLM 评估 | QA 任务 |
| Consistency | 多次运行对比 | 非确定性任务 |
| Latency | 时间戳计算 | 性能分析 |
| Cost | Token 使用量 | 成本控制 |
1.2.3 evaluate() 批量测试#
注意:LangSmith 1.0+ 中
run_on_dataset()已重命名为evaluate(),推荐使用新 API
evaluate() 函数的 data 参数支持的类型:
evaluate() 函数的 data 参数非常灵活,支持多种数据源:
| 类型 | 说明 | 示例 |
|---|---|---|
str (数据集名称) | LangSmith 平台上的数据集名称 | "qa_benchmark" |
str (UUID) | LangSmith 数据集的 UUID | "a3d2f1b8-..." |
list[dict] | 直接传入示例列表 | [{"inputs": {...}, "outputs": {...}}, ...] |
Iterator[dict] | 示例迭代器(用于大数据集) | iter([{...}, {...}]) |
Iterator[Example] | LangSmith Example 对象迭代器 | client.list_examples(dataset_name="...") |
使用不同类型的示例:
from langsmith.evaluation import evaluate
from langsmith import Client
client = Client()
# 1. 使用数据集名称(最常用)
results = evaluate(
predict,
data="qa_benchmark", # 数据集名称
evaluators=[accuracy]
)
# 2. 使用数据集 UUID
results = evaluate(
predict,
data="a3d2f1b8-1234-5678-90ab-cdef12345678", # UUID
evaluators=[accuracy]
)
# 3. 直接传入示例列表(快速测试)
examples = [
{
"inputs": {"question": "What is LangChain?"},
"outputs": {"answer": "LangChain is a framework..."}
},
{
"inputs": {"question": "What is Python?"},
"outputs": {"answer": "Python is a programming language..."}
}
]
results = evaluate(
predict,
data=examples, # 列表
evaluators=[accuracy]
)
# 4. 使用迭代器(大数据集,节省内存)
def example_generator():
"""生成器函数,逐个产生示例"""
for i in range(1000):
yield {
"inputs": {"question": f"Question {i}"},
"outputs": {"answer": f"Answer {i}"}
}
results = evaluate(
predict,
data=example_generator(), # 迭代器
evaluators=[accuracy]
)
# 5. 使用 LangSmith Example 对象迭代器
examples_iter = client.list_examples(
dataset_name="qa_benchmark",
limit=100 # 只测试前 100 个
)
results = evaluate(
predict,
data=examples_iter,
evaluators=[accuracy]
)基本用法:
from langsmith.evaluation import evaluate
from langsmith import Client
client = Client()
# 定义预测函数
def predict(input_dict):
"""应用的预测函数"""
question = input_dict.get("question")
# 调用 Agent
agent = create_agent(model=ChatOpenAI(model="gpt-4o-mini"), tools=[...])
result = agent.invoke({"messages": [("user", question)]})
return {"answer": result["messages"][-1].content}
# 定义评估器
def exact_match(run, example):
"""精确匹配评估"""
return {
"score": 1.0 if run.outputs["answer"] == example.outputs["answer"] else 0.0,
"key": "exact_match"
}
def relevance(run, example):
"""相关性评估"""
model = ChatOpenAI(model="gpt-4o-mini")
response = model.invoke(
f"问题: {example.inputs['question']}\n答案: {run.outputs['answer']}\n"
f"评估相关性(0-1):"
)
score = float(response.content)
return {"score": score, "key": "relevance"}
# 运行评估
experiment_results = evaluate(
predict, # 预测函数
data="qa_benchmark", # 数据集名称
evaluators=[exact_match, relevance], # 行级评估器
experiment_prefix="Model v1", # 实验名称前缀
description="测试新模型版本",
num_repetitions=2, # 运行 2 次(处理 LLM 非确定性)
max_concurrency=10 # 并发数
)
# 查看结果
print(f"实验名: {experiment_results.experiment_name}")
print(f"总样例数: {len(experiment_results.results)}")
# 统计指标
scores = [r.evaluation_results[0].score for r in experiment_results.results]
print(f"平均准确率: {sum(scores) / len(scores):.2%}")高级用法:Summary Evaluator
from langsmith.evaluation import evaluate
def accuracy_summary(runs, examples):
"""实验级评估器(在所有运行上计算)"""
correct = 0
for run, example in zip(runs, examples):
if run.outputs["answer"] == example.outputs["answer"]:
correct += 1
accuracy = correct / len(runs)
return {"score": accuracy, "key": "accuracy"}
def precision_recall_summary(runs, examples):
"""计算 Precision 和 Recall"""
predictions = [run.outputs["answer"] for run in runs]
references = [example.outputs["answer"] for example in examples]
# 假设输出是分类标签列表
tp = sum(1 for p, r in zip(predictions, references) if p == r and p == "positive")
fp = sum(1 for p, r in zip(predictions, references) if p != r and p == "positive")
fn = sum(1 for p, r in zip(predictions, references) if p != r and r == "positive")
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
return [
{"score": precision, "key": "precision"},
{"score": recall, "key": "recall"}
]
# 使用 Summary Evaluator
evaluate(
predict,
data="classification_dataset",
evaluators=[exact_match],
summary_evaluators=[accuracy_summary, precision_recall_summary],
experiment_prefix="Classification v1"
)异步评估(处理大规模数据集):
import asyncio
from langsmith.evaluation import aevaluate
async def main():
# 异步预测函数
async def async_predict(input_dict):
# 异步 Agent 调用
result = await async_agent.ainvoke({"messages": [("user", input_dict["question"])]})
return {"answer": result["messages"][-1].content}
# 运行异步评估
results = await aevaluate(
async_predict,
data="large_dataset",
evaluators=[exact_match, relevance],
experiment_prefix="Async Model v1",
max_concurrency=50 # 并发 50 个请求
)
return results
# 执行
results = asyncio.run(main())1.2.4 人工标注与反馈收集#
通过 LangSmith UI 标注:
创建 Annotation Queue
- 在项目中点击 “Create Annotation Queue”
- 选择数据集和要标注的字段
标注流程
- 逐条查看例子
- 给每条结果评分或写评论
- 导出标注结果
编程方式收集反馈:
from langsmith import Client
client = Client()
# 应用中收集用户反馈
run_id = "run-123"
# 用户点赞/点踩
client.create_feedback(
run_id=run_id,
key="user_rating",
score=1.0, # 1.0 = 好, 0.0 = 不好
comment="回答很准确!"
)
# 专家标注
client.create_feedback(
run_id=run_id,
key="expert_annotation",
score=0.8,
comment="大部分正确,但第二点不够准确"
)
# 导出所有反馈用于分析
feedbacks = client.list_feedbacks(
run_ids=[run_id]
)
for feedback in feedbacks:
print(f"{feedback.key}: {feedback.score}")
print(f" {feedback.comment}")1.3 持续优化#
1.3.1 Prompt Hub 版本控制#
场景:系统提示词需要版本管理和对比。
使用 LangSmith Prompt Hub:
from langsmith import Client
from langchain_core.prompts import ChatPromptTemplate
client = Client()
# 方法 1:将提示词推送到 Hub
system_prompt_v1 = """你是一个专业的编程助手。
能力:
1. 回答编程问题
2. 生成代码
3. 调试错误
约束:
- 只讨论编程相关话题
- 生成的代码要有注释
"""
# 创建提示词模板
prompt_template = ChatPromptTemplate.from_messages([
("system", system_prompt_v1),
("user", "{input}")
])
# 推送到 Hub(需要 API 权限)
client.push_prompt(
prompt_identifier="programming_assistant_system_prompt",
object=prompt_template,
description="编程助手的系统提示词"
)
# 方法 2:从 Hub 拉取提示词
system_prompt = client.pull_prompt("programming_assistant_system_prompt")
# 方法 3:版本管理
# 每次推送自动创建新版本,可以:
# 1. 查看历史版本
# 2. 对比版本差异
# 3. 拉取特定版本
updated_prompt_v2 = """你是一个专业的编程助手。
增强能力:
1. 回答编程问题
2. 生成代码
3. 调试错误
约束:
- 只讨论编程相关话题
- 生成的代码要有注释
"""
# 推送新版本(自动创建版本 2)
updated_template = ChatPromptTemplate.from_messages([
("system", updated_prompt_v2),
("user", "{input}")
])
client.push_prompt(
prompt_identifier="programming_assistant_system_prompt",
object=updated_template,
description="添加 '增强' 前缀以强调能力"
)版本对比:
# 在 LangSmith UI 中:
# 1. 打开 Prompt Hub
# 2. 选择提示词
# 3. 点击 "Compare Versions"
# 4. 查看变化和对比结果1.3.2 A/B 测试#
完整的 A/B 测试流程:
from langsmith.evaluation import evaluate
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
# 版本 A:原始模型和提示词
def predict_v_a(input_dict):
agent = create_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[...],
system_prompt="你是一个助手"
)
result = agent.invoke({"messages": [("user", input_dict["question"])]})
return {"answer": result["messages"][-1].content}
# 版本 B:改进的提示词
def predict_v_b(input_dict):
agent = create_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[...],
system_prompt="""你是一个专业的助手。
在回答前:
1. 理解问题的关键信息
2. 分解复杂问题为子问题
3. 使用工具获取必要信息
最后提供清晰、准确的答案。"""
)
result = agent.invoke({"messages": [("user", input_dict["question"])]})
return {"answer": result["messages"][-1].content}
# 定义评估器
def exact_match(run, example):
return {
"score": 1.0 if run.outputs["answer"] == example.outputs["answer"] else 0.0,
"key": "exact_match"
}
# 运行版本 A
experiment_a = evaluate(
predict_v_a,
data="qa_benchmark",
evaluators=[exact_match],
experiment_prefix="Version A",
description="基准版本"
)
# 运行版本 B
experiment_b = evaluate(
predict_v_b,
data="qa_benchmark",
evaluators=[exact_match],
experiment_prefix="Version B",
description="改进的提示词"
)
# 对比结果
print("=== 版本对比 ===")
print(f"版本 A 准确率: {calculate_accuracy(experiment_a):.2%}")
print(f"版本 B 准确率: {calculate_accuracy(experiment_b):.2%}")
def calculate_accuracy(experiment):
scores = [r.evaluation_results[0].score for r in experiment.results]
return sum(scores) / len(scores)统计显著性检验:
from scipy import stats
# 获取两个版本的评分
scores_a = [r.evaluation_results[0].score for r in experiment_a.results]
scores_b = [r.evaluation_results[0].score for r in experiment_b.results]
# T 检验
t_stat, p_value = stats.ttest_ind(scores_a, scores_b)
print(f"T 统计量: {t_stat:.4f}")
print(f"P 值: {p_value:.4f}")
if p_value < 0.05:
if sum(scores_b) > sum(scores_a):
print("✅ 版本 B 显著更好(p < 0.05)")
else:
print("✅ 版本 A 显著更好(p < 0.05)")
else:
print("⚠️ 差异不显著(p >= 0.05)")1.3.3 监控指标与问题发现#
关键监控指标:
from langsmith import Client
from datetime import datetime, timedelta
client = Client()
# 监控指标收集
def monitor_agent_health():
"""Agent 健康监测"""
# 获取最近 24 小时的运行
yesterday = datetime.now() - timedelta(days=1)
runs = client.list_runs(
project_name="my_agent",
start_time=yesterday
)
# 计算关键指标
total_runs = len(runs)
successful_runs = sum(1 for run in runs if run.status == "success")
error_runs = sum(1 for run in runs if run.status == "error")
avg_duration = sum(
(run.end_time - run.start_time).total_seconds()
for run in runs
) / total_runs
total_cost = sum(run.cost for run in runs if run.cost)
# 计算错误率
error_rate = error_runs / total_runs if total_runs > 0 else 0
# 计算成本效率
cost_per_success = total_cost / successful_runs if successful_runs > 0 else 0
# 返回指标
return {
"total_runs": total_runs,
"success_rate": successful_runs / total_runs,
"error_rate": error_rate,
"avg_duration": avg_duration,
"total_cost": total_cost,
"cost_per_success": cost_per_success
}
metrics = monitor_agent_health()
print("=== Agent 健康监测 ===")
print(f"总请求: {metrics['total_runs']}")
print(f"成功率: {metrics['success_rate']:.2%}")
print(f"错误率: {metrics['error_rate']:.2%}")
print(f"平均耗时: {metrics['avg_duration']:.2f}s")
print(f"总成本: ${metrics['total_cost']:.2f}")
print(f"成本/成功: ${metrics['cost_per_success']:.4f}")问题自动告警:
def check_health_alerts():
"""检查是否有告警条件"""
metrics = monitor_agent_health()
alerts = []
# 错误率过高
if metrics["error_rate"] > 0.1: # > 10%
alerts.append(f"⚠️ 错误率过高: {metrics['error_rate']:.2%}")
# 响应时间过长
if metrics["avg_duration"] > 30: # > 30秒
alerts.append(f"⚠️ 平均响应时间过长: {metrics['avg_duration']:.2f}s")
# 成本突增
if metrics["cost_per_success"] > 0.01: # > $0.01 per success
alerts.append(f"⚠️ 成本过高: ${metrics['cost_per_success']:.4f}")
if alerts:
print("=== 告警 ===")
for alert in alerts:
print(alert)
else:
print("✅ 系统健康")反馈驱动的问题发现:
# 查找用户标记为"不好"的运行
bad_runs = client.list_runs(
project_name="my_agent",
filter='feedback_key:"user_rating" AND feedback_score:0'
)
print(f"找到 {len(bad_runs)} 条差评运行")
# 分析失败原因
failure_patterns = {}
for run in bad_runs:
# 获取失败的工具调用
if run.child_runs:
for child in run.child_runs:
if child.status == "error":
tool_name = child.name
if tool_name not in failure_patterns:
failure_patterns[tool_name] = 0
failure_patterns[tool_name] += 1
# 排序并显示
for tool, count in sorted(failure_patterns.items(), key=lambda x: x[1], reverse=True):
print(f"{tool}: {count} 次失败")1.3.4 优化迭代流程#
完整的优化循环:
1. 基准测试 → 获得当前性能基线
↓
2. 假设提出 → 例如"改进提示词可以提高准确率 5%"
↓
3. 制定对策 → 编写新的提示词或改进模型选择
↓
4. 运行 A/B 测试 → 对比新旧版本
↓
5. 分析结果 → 是否满足假设?
├─ 是 → 更新版本到生产
└─ 否 → 返回步骤 2,尝试新假设
↓
6. 监控上线 → 持续监控生产指标
↓
7. 收集反馈 → 用户标注、问题报告
↓
8. 返回步骤 1 → 开始新一轮优化实现优化循环的代码框架:
from langsmith import Client
from langsmith.evaluation import evaluate
from datetime import datetime
class ContinuousOptimization:
def __init__(self, project_name: str):
self.client = Client()
self.project_name = project_name
self.iteration = 0
def baseline(self):
"""获得基准测试结果"""
results = evaluate(
predict,
data="qa_benchmark",
evaluators=[exact_match, relevance],
experiment_prefix=f"Baseline_{datetime.now().isoformat()}",
description="基准版本"
)
accuracy = self.calculate_accuracy(results)
print(f"📊 基线准确率: {accuracy:.2%}")
return results
def propose_hypothesis(self, hypothesis: str):
"""提出假设"""
self.hypothesis = hypothesis
print(f"💡 假设: {hypothesis}")
def implement_changes(self, changes: dict):
"""实现改进"""
self.changes = changes
print(f"⚙️ 改进: {changes}")
def run_experiment(self):
"""运行实验"""
self.iteration += 1
# 这里调用改进后的预测函数
results = evaluate(
predict_improved, # 改进后的版本
data="qa_benchmark",
evaluators=[exact_match, relevance],
experiment_prefix=f"Iteration_{self.iteration}_{datetime.now().isoformat()}",
description=f"改进: {self.hypothesis}"
)
new_accuracy = self.calculate_accuracy(results)
return new_accuracy
def compare_with_baseline(self, baseline_acc, new_acc):
"""对比改进效果"""
improvement = (new_acc - baseline_acc) / baseline_acc * 100
if improvement > 0:
print(f"✅ 改进成功! 提升 {improvement:.2%}")
return True
else:
print(f"❌ 未达到预期,下降 {abs(improvement):.2%}")
return False
def deploy_to_production(self):
"""部署到生产环境"""
print("🚀 部署新版本到生产")
# 更新生产 Agent 的配置
def calculate_accuracy(self, results):
scores = [r.evaluation_results[0].score for r in results.results]
return sum(scores) / len(scores)
# 使用示例
optimization = ContinuousOptimization("my_agent")
# 第 1 轮:基准
baseline_results = optimization.baseline() # 准确率: 75%
# 第 2 轮:改进提示词
optimization.propose_hypothesis("详细的系统提示词能提高准确率")
optimization.implement_changes({"system_prompt": "新提示词..."})
acc_v2 = optimization.run_experiment() # 准确率: 78%
if optimization.compare_with_baseline(0.75, acc_v2):
optimization.deploy_to_production()
# 第 3 轮:切换模型
optimization.propose_hypothesis("使用更强的模型能进一步提高准确率")
optimization.implement_changes({"model": "gpt-4o"})
acc_v3 = optimization.run_experiment() # 准确率: 82%
if optimization.compare_with_baseline(0.75, acc_v3):
optimization.deploy_to_production()本章小结#
- 追踪体系:自动和手动追踪,形成完整的执行树,用于调试和分析
- Evaluation 框架:Dataset + Evaluator 的组合,支持启发式、LLM、人工评估
- 批量测试:
evaluate()和aevaluate()函数,支持行级和实验级评估 - 持续优化:A/B 测试、版本控制、监控告警、迭代流程
思考与练习#
思考:为什么需要
num_repetitions > 1在evaluate()中?答案
LLM 的输出有随机性(由 temperature 参数控制)。运行多次可以:
- 评估模型的稳定性
- 获得更准确的平均指标
- 检测随机导致的偶然好/坏结果
练习:实现一个
f1_score_summary()评估器,计算 F1 分数。思考:如何设计一个反馈循环,自动识别失败模式并提出改进建议?
总结与展望#
通过 LangSmith 的追踪和评估体系,我们已经掌握了:
- 可观测性:完整的执行链路可见
- 可测试性:科学的评估框架
- 可优化性:数据驱动的迭代
这些能力为后续的高级应用(多 Agent、MCP 集成)和生产实践提供了坚实的基础。
参考资源:
第2章: 架构设计模式#
关注点:掌握生产级 LangChain 应用的架构设计。
2.1 RAG 架构设计#
生产级 RAG 系统架构:
┌─────────────────────────────────────────────────────────────┐
│ 用户接口层 │
│ (Web/API/Chat Interface) │
└─────────────┬───────────────────────────────────────────────┘
│
┌─────────────▼───────────────────────────────────────────────┐
│ 应用服务层 │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────┐ │
│ │ 查询处理器 │ │ Agent 引擎 │ │ 结果后处理 │ │
│ └─────────────┘ └──────────────┘ └─────────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
┌─────────────▼───────────────────────────────────────────────┐
│ 核心层 │
│ ┌────────────┐ ┌────────────┐ ┌─────────────┐ │
│ │ 向量检索器 │ │ 重排序器 │ │ LLM 网关 │ │
│ └────────────┘ └────────────┘ └─────────────┘ │
└─────────────┬───────────────────────────────────────────────┘
│
┌─────────────▼───────────────────────────────────────────────┐
│ 数据层 │
│ ┌────────────┐ ┌────────────┐ ┌─────────────┐ │
│ │ 向量数据库 │ │ 文档存储 │ │ 缓存层 │ │
│ └────────────┘ └────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘实现代码:
from typing import List, Optional, Dict, Any
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_community.retrievers import ContextualCompressionRetriever
from langchain_community.retrievers.document_compressors import CohereRerank
from langchain_core.documents import Document
import redis
import hashlib
import json
class ProductionRAGSystem:
"""生产级 RAG 系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
self.vector_store = self._init_vector_store()
self.cache = self._init_cache()
self.llm = self._init_llm()
self.retriever = self._init_retriever()
def _init_vector_store(self):
"""初始化向量数据库"""
return Pinecone.from_existing_index(
index_name=self.config["pinecone_index"],
embedding=self.embeddings,
namespace=self.config.get("namespace", "default")
)
def _init_cache(self):
"""初始化缓存层"""
return redis.Redis(
host=self.config["redis_host"],
port=self.config["redis_port"],
db=0,
decode_responses=True
)
def _init_llm(self):
"""初始化 LLM 网关"""
return ChatOpenAI(
model=self.config["llm_model"],
temperature=0.7,
max_retries=3,
request_timeout=30
)
def _init_retriever(self):
"""初始化检索器(带重排序)"""
base_retriever = self.vector_store.as_retriever(
search_kwargs={"k": self.config.get("retrieve_k", 10)}
)
# 添加压缩/重排序(使用 CohereRerank)
compressor = CohereRerank(
model="rerank-multilingual-v3.0",
top_n=self.config.get("top_k", 5)
)
return ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
def _get_cache_key(self, query: str) -> str:
"""生成缓存键"""
return f"rag:{hashlib.md5(query.encode()).hexdigest()}"
def retrieve(self, query: str) -> List[Document]:
"""检索相关文档"""
# 查询改写
rewritten_query = self._rewrite_query(query)
# 检索
docs = self.retriever.get_relevant_documents(rewritten_query)
# 后处理
docs = self._post_process_docs(docs)
return docs
def _rewrite_query(self, query: str) -> str:
"""查询改写(提高检索质量)"""
prompt = f"""请改写以下查询,使其更适合向量检索:
原始查询:{query}
改写后的查询(保持语义,优化关键词):"""
response = self.llm.invoke(prompt)
return response.content
def _post_process_docs(self, docs: List[Document]) -> List[Document]:
"""文档后处理"""
# 去重
seen = set()
unique_docs = []
for doc in docs:
doc_hash = hashlib.md5(doc.page_content.encode()).hexdigest()
if doc_hash not in seen:
seen.add(doc_hash)
unique_docs.append(doc)
# 按相关性重新排序
unique_docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True)
return unique_docs[:self.config.get("top_k", 5)]
def generate_answer(self, query: str, docs: List[Document]) -> str:
"""生成答案"""
context = "\n\n".join([doc.page_content for doc in docs])
prompt = f"""基于以下上下文回答问题:
上下文:
{context}
问题:{query}
答案:"""
response = self.llm.invoke(prompt)
return response.content
def query(self, query: str, use_cache: bool = True) -> Dict[str, Any]:
"""完整查询流程"""
# 检查缓存
if use_cache:
cache_key = self._get_cache_key(query)
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# 检索
docs = self.retrieve(query)
# 生成答案
answer = self.generate_answer(query, docs)
result = {
"query": query,
"answer": answer,
"sources": [
{
"content": doc.page_content[:200] + "...",
"metadata": doc.metadata
}
for doc in docs
]
}
# 写入缓存
if use_cache:
self.cache.setex(
cache_key,
self.config.get("cache_ttl", 3600),
json.dumps(result)
)
return resultRAG Agent 实现(推荐):
使用 @tool(response_format="content_and_artifact") 是构建 RAG Agent 的官方推荐方式,允许工具同时返回内容和原始数据:
from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from typing import List
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=[...], # 你的文档
embedding=OpenAIEmbeddings()
)
# 定义检索工具(使用 content_and_artifact 格式)
@tool(response_format="content_and_artifact")
def retrieve_docs(query: str) -> tuple[str, List]:
"""检索相关文档
Args:
query: 用户查询
Returns:
(content, artifact): 内容摘要和原始文档列表
"""
# 检索文档
docs = vectorstore.similarity_search(query, k=3)
# 构造返回内容
content = f"找到 {len(docs)} 个相关文档"
artifact = [doc.page_content for doc in docs] # 原始文档
return content, artifact
# 创建 RAG Agent
rag_agent = create_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=[retrieve_docs],
prompt="""你是一个 RAG 助手。
使用 retrieve_docs 工具获取相关文档,然后基于文档内容回答问题。
如果文档中没有相关信息,明确告诉用户。
"""
)
# 使用
result = rag_agent.invoke({
"messages": [("user", "LangChain 1.0 有哪些新特性?")]
})
print(result["messages"][-1].content)
# Agent 会自动:
# 1. 调用 retrieve_docs 工具
# 2. 访问 artifact 中的原始文档
# 3. 基于文档内容生成答案使用 RunnablePassthrough.assign() 的 LCEL 方式:
from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义 RAG 提示词
rag_prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题:
上下文:
{context}
问题: {question}
回答:
""")
# 构建 RAG 链(使用 assign 添加检索结果)
rag_chain = (
RunnablePassthrough.assign(
context=lambda x: vectorstore.similarity_search(x["question"], k=3)
)
| RunnablePassthrough.assign(
context=lambda x: "\n\n".join([doc.page_content for doc in x["context"]])
)
| rag_prompt
| ChatOpenAI(model="gpt-4o")
| StrOutputParser()
)
# 使用
answer = rag_chain.invoke({
"question": "LangChain 1.0 有哪些新特性?"
})使用 @dynamic_prompt 的 Middleware 方式(推荐):
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
@dynamic_prompt
def prompt_with_context(request: ModelRequest) -> str:
"""动态注入检索上下文"""
# 获取最后一条用户消息
last_query = request.state["messages"][-1].text
# 执行检索
retrieved_docs = vectorstore.similarity_search(last_query, k=3)
# 格式化文档内容
docs_content = "\n\n".join([
f"文档 {i+1}:\n{doc.page_content}"
for i, doc in enumerate(retrieved_docs)
])
# 返回系统消息(会自动注入到请求中)
system_message = (
"你是一个 RAG 助手。使用以下上下文回答用户问题:\n\n"
f"{docs_content}\n\n"
"如果上下文中没有相关信息,请明确告知用户。"
)
return system_message
# 创建 Agent(传入 middleware)
rag_agent = create_agent(
model=ChatOpenAI(model="gpt-4o"),
tools=[], # 不需要工具,检索在 middleware 中完成
middleware=[prompt_with_context] # ✅ 关键:传入 dynamic_prompt
)
# 使用(自动在每次请求前检索)
result = rag_agent.invoke({
"messages": [("user", "LangChain 1.0 有哪些新特性?")]
})@dynamic_prompt 的优势:
- 自动执行:每次模型调用前自动检索,无需手动调用工具
- 单次推理:只需 1 次 LLM 调用(vs Agent 工具模式需要 2 次)
- 透明注入:上下文自动添加到系统消息,模型无感知
- 适合简单场景:固定的检索-回答流程,性能最优
三种 RAG 实现方式对比:
| 特性 | Agent + Tool | @dynamic_prompt | LCEL Chain |
|---|---|---|---|
| LLM 调用次数 | 2 次(决策+回答) | 1 次 | 1 次 |
| 灵活性 | 高(自主决策) | 中(固定检索) | 低(固定流程) |
| 复杂度 | 低(自动推理) | 低(Middleware) | 中(需设计链) |
| 成本 | 较高 | 中等 | 较低 |
| 可控性 | 低 | 中 | 高 |
| 适用场景 | 复杂查询、多步推理 | 简单 RAG、性能优化 | 完全自定义流程 |
推荐实践:
- 简单 RAG(单次检索):使用
@dynamic_prompt(性能最优) - 复杂查询(多次检索):使用 Agent + Tool(自动决策)
- 完全自定义流程:使用 LCEL Chain(最大控制)
- 混合方案:Agent + dynamic_prompt(灵活性与效率兼顾)
RAG 优化策略:
class OptimizedRAGPipeline:
"""优化的 RAG 流水线"""
def __init__(self):
self.strategies = {
"hybrid_search": self.hybrid_search,
"multi_query": self.multi_query_retrieval,
"iterative": self.iterative_retrieval,
"ensemble": self.ensemble_retrieval
}
def hybrid_search(self, query: str) -> List[Document]:
"""混合搜索(向量 + 关键词)"""
# 向量搜索
vector_results = self.vector_store.similarity_search(query, k=10)
# BM25 关键词搜索
keyword_results = self.bm25_retriever.get_relevant_documents(query)
# 合并结果
all_docs = vector_results + keyword_results
# 使用 RRF (Reciprocal Rank Fusion) 重排序
return self._reciprocal_rank_fusion(all_docs)
def multi_query_retrieval(self, query: str) -> List[Document]:
"""多查询检索"""
# 生成多个查询变体
queries = self._generate_query_variants(query)
all_docs = []
for q in queries:
docs = self.retriever.get_relevant_documents(q)
all_docs.extend(docs)
# 去重并重排序
return self._deduplicate_and_rank(all_docs)
def iterative_retrieval(self, query: str, max_iterations: int = 3) -> List[Document]:
"""迭代检索"""
docs = []
current_query = query
for i in range(max_iterations):
# 检索
new_docs = self.retriever.get_relevant_documents(current_query)
docs.extend(new_docs)
# 判断是否需要继续
if self._is_sufficient(docs, query):
break
# 基于已有结果生成新查询
current_query = self._generate_followup_query(query, docs)
return docs
def ensemble_retrieval(self, query: str) -> List[Document]:
"""集成检索"""
retrievers = [
self.dense_retriever, # 密集向量
self.sparse_retriever, # 稀疏向量
self.cross_encoder # 交叉编码器
]
all_results = []
weights = [0.5, 0.3, 0.2] # 权重
for retriever, weight in zip(retrievers, weights):
docs = retriever.get_relevant_documents(query)
for doc in docs:
doc.metadata["weight"] = weight
all_results.extend(docs)
return self._weighted_merge(all_results)2.2 Multi-Agent 架构选择#
生产环境多 Agent 架构决策:
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
class AgentArchitecture(Enum):
"""Agent 架构类型"""
MONOLITHIC = "monolithic" # 单体 Agent
SUPERVISOR_WORKER = "supervisor" # 监督者-工作者
PIPELINE = "pipeline" # 流水线
HIERARCHICAL = "hierarchical" # 层级
MESH = "mesh" # 网状
@dataclass
class ArchitectureDecision:
"""架构决策"""
architecture: AgentArchitecture
reason: str
pros: List[str]
cons: List[str]
implementation_complexity: int # 1-10
class ArchitectureSelector:
"""架构选择器"""
def select(
self,
task_complexity: int, # 1-10
team_size: int, # Agent 数量
coordination_level: str, # low/medium/high
latency_requirement: str, # low/medium/high
scalability_need: str # low/medium/high
) -> ArchitectureDecision:
"""选择合适的架构"""
# 简单任务
if task_complexity <= 3:
if team_size <= 1:
return ArchitectureDecision(
architecture=AgentArchitecture.MONOLITHIC,
reason="简单任务使用单体 Agent 即可",
pros=["简单", "低延迟", "易调试"],
cons=["扩展性差", "功能受限"],
implementation_complexity=2
)
# 中等复杂度
if task_complexity <= 6:
if coordination_level == "high":
return ArchitectureDecision(
architecture=AgentArchitecture.SUPERVISOR_WORKER,
reason="需要高度协调,使用监督者模式",
pros=["控制清晰", "易于管理", "责任明确"],
cons=["中央瓶颈", "监督者复杂"],
implementation_complexity=5
)
else:
return ArchitectureDecision(
architecture=AgentArchitecture.PIPELINE,
reason="顺序处理任务,使用流水线",
pros=["流程清晰", "易于扩展", "可并行"],
cons=["灵活性差", "错误传播"],
implementation_complexity=4
)
# 高复杂度
if team_size > 10:
return ArchitectureDecision(
architecture=AgentArchitecture.HIERARCHICAL,
reason="大规模团队需要层级管理",
pros=["可扩展", "职责清晰", "易于管理大团队"],
cons=["复杂", "调试困难", "层级延迟"],
implementation_complexity=8
)
# 默认
return ArchitectureDecision(
architecture=AgentArchitecture.SUPERVISOR_WORKER,
reason="通用架构,适合大多数场景",
pros=["平衡", "成熟", "灵活"],
cons=["可能过度设计"],
implementation_complexity=5
)
# 生产级多 Agent 实现
class ProductionMultiAgentSystem:
"""生产级多 Agent 系统"""
def __init__(self, architecture: AgentArchitecture):
self.architecture = architecture
self.agents = {}
self.metrics = {}
self.circuit_breakers = {}
def register_agent(self, name: str, agent: Any):
"""注册 Agent(带健康检查)"""
self.agents[name] = agent
self.circuit_breakers[name] = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
def execute_with_fallback(self, agent_name: str, task: dict) -> dict:
"""带降级的执行"""
breaker = self.circuit_breakers[agent_name]
if breaker.is_open():
# 熔断器打开,使用降级策略
return self._fallback_strategy(agent_name, task)
try:
# 正常执行
result = self.agents[agent_name].invoke(task)
breaker.record_success()
return result
except Exception as e:
breaker.record_failure()
if breaker.is_open():
# 刚刚熔断
self._alert_circuit_open(agent_name)
# 使用降级策略
return self._fallback_strategy(agent_name, task)
def _fallback_strategy(self, agent_name: str, task: dict) -> dict:
"""降级策略"""
# 策略 1:使用备用 Agent
backup_agent = self.agents.get(f"{agent_name}_backup")
if backup_agent:
return backup_agent.invoke(task)
# 策略 2:返回缓存结果
cached = self.get_cached_result(task)
if cached:
return cached
# 策略 3:返回默认响应
return {
"status": "degraded",
"message": f"{agent_name} 暂时不可用,请稍后重试"
}2.3 Workflow 架构模式#
工作流编排架构:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class WorkflowOrchestrator:
"""工作流编排器"""
def __init__(self):
self.workflows = {}
self.templates = self._load_templates()
def _load_templates(self):
"""加载工作流模板"""
return {
"approval_flow": self._create_approval_workflow(),
"data_pipeline": self._create_data_pipeline(),
"customer_service": self._create_cs_workflow()
}
def _create_approval_workflow(self):
"""创建审批工作流"""
class ApprovalState(TypedDict):
request: dict
approvals: Annotated[List[dict], operator.add]
status: str
level: int
workflow = StateGraph(ApprovalState)
# 节点
workflow.add_node("validate", self.validate_request)
workflow.add_node("level1_approval", self.level1_approval)
workflow.add_node("level2_approval", self.level2_approval)
workflow.add_node("execute", self.execute_request)
workflow.add_node("notify", self.notify_result)
# 边
workflow.add_edge("validate", "level1_approval")
# 条件边
workflow.add_conditional_edges(
"level1_approval",
lambda x: "level2" if x["request"]["amount"] > 10000 else "execute",
{
"level2": "level2_approval",
"execute": "execute"
}
)
workflow.add_edge("level2_approval", "execute")
workflow.add_edge("execute", "notify")
workflow.add_edge("notify", END)
workflow.set_entry_point("validate")
return workflow.compile()
def create_dynamic_workflow(self, config: dict):
"""动态创建工作流"""
workflow = StateGraph(dict)
# 动态添加节点
for node_config in config["nodes"]:
node_func = self._create_node_function(node_config)
workflow.add_node(node_config["id"], node_func)
# 动态添加边
for edge_config in config["edges"]:
if edge_config["type"] == "direct":
workflow.add_edge(edge_config["from"], edge_config["to"])
elif edge_config["type"] == "conditional":
workflow.add_conditional_edges(
edge_config["from"],
self._create_condition_function(edge_config["condition"]),
edge_config["branches"]
)
workflow.set_entry_point(config["entry"])
return workflow.compile()第3章: 性能与成本优化#
关注点:掌握生产环境的性能调优和成本控制。
3.1 延迟优化#
综合延迟优化策略:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from functools import lru_cache
from typing import List, Dict, Any
class LatencyOptimizer:
"""延迟优化器"""
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
self.model_latencies = {} # 记录模型延迟
def select_optimal_model(self, task_type: str, max_latency: float) -> str:
"""根据延迟要求选择模型"""
model_configs = {
"gpt-4o": {"latency": 2.5, "quality": 0.95, "cost": 0.03},
"gpt-4o-mini": {"latency": 0.8, "quality": 0.85, "cost": 0.001},
"gpt-3.5-turbo": {"latency": 0.5, "quality": 0.80, "cost": 0.0005},
"claude-3-haiku": {"latency": 0.3, "quality": 0.82, "cost": 0.0003}
}
# 筛选满足延迟要求的模型
eligible_models = [
model for model, config in model_configs.items()
if config["latency"] <= max_latency
]
if not eligible_models:
raise ValueError(f"没有模型满足 {max_latency}s 延迟要求")
# 选择质量最高的
return max(eligible_models, key=lambda x: model_configs[x]["quality"])
async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> List[Any]:
"""并行执行多个任务"""
async def execute_task(task):
"""执行单个任务"""
start = time.time()
# 异步执行
result = await self._async_invoke(task)
# 记录延迟
latency = time.time() - start
self._record_latency(task["model"], latency)
return result
# 并行执行所有任务
results = await asyncio.gather(*[execute_task(task) for task in tasks])
return results
@lru_cache(maxsize=1000)
def cached_inference(self, prompt_hash: str) -> str:
"""缓存的推理"""
# 缓存会自动处理
return self._invoke_llm(prompt_hash)
def optimize_prompt_batching(self, prompts: List[str]) -> List[str]:
"""优化提示词批处理"""
# 批处理大小优化
optimal_batch_size = self._calculate_optimal_batch_size(len(prompts))
results = []
for i in range(0, len(prompts), optimal_batch_size):
batch = prompts[i:i + optimal_batch_size]
# 批量处理
batch_results = self._batch_inference(batch)
results.extend(batch_results)
return results
def _calculate_optimal_batch_size(self, total_count: int) -> int:
"""计算最优批处理大小"""
# 基于经验公式
if total_count < 10:
return total_count
elif total_count < 100:
return 10
else:
return min(50, total_count // 10)
def pipeline_optimization(self):
"""流水线优化"""
class Pipeline:
def __init__(self):
self.stages = []
def add_stage(self, func, parallel=False):
self.stages.append((func, parallel))
async def execute(self, input_data):
current = input_data
for func, parallel in self.stages:
if parallel and isinstance(current, list):
# 并行处理列表
current = await asyncio.gather(*[func(item) for item in current])
else:
# 串行处理
current = await func(current)
return current
return Pipeline()3.2 吞吐量优化#
吞吐量优化实现:
import asyncio
from asyncio import Queue, Semaphore
from typing import Optional
import aiohttp
class ThroughputOptimizer:
"""吞吐量优化器"""
def __init__(self, max_concurrent: int = 100):
self.semaphore = Semaphore(max_concurrent)
self.request_queue = Queue()
self.connection_pool = None
async def init_connection_pool(self):
"""初始化连接池"""
connector = aiohttp.TCPConnector(
limit=100, # 总连接数
limit_per_host=30, # 每个主机的连接数
ttl_dns_cache=300 # DNS 缓存时间
)
timeout = aiohttp.ClientTimeout(
total=300,
connect=10,
sock_read=30
)
self.connection_pool = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
async def batch_processor(self, batch_size: int = 10):
"""批处理处理器"""
batch = []
while True:
try:
# 收集批次
while len(batch) < batch_size:
item = await asyncio.wait_for(
self.request_queue.get(),
timeout=1.0 # 1秒超时
)
batch.append(item)
except asyncio.TimeoutError:
# 超时但有数据,处理现有批次
if batch:
await self._process_batch(batch)
batch = []
# 批次满,处理
if len(batch) >= batch_size:
await self._process_batch(batch)
batch = []
async def _process_batch(self, batch: List[dict]):
"""处理一个批次"""
async with self.semaphore:
# 批量 API 调用
tasks = [self._make_request(item) for item in batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for item, result in zip(batch, results):
if isinstance(result, Exception):
await self._handle_error(item, result)
else:
await self._handle_success(item, result)
async def adaptive_concurrency(self):
"""自适应并发控制"""
class AdaptiveLimiter:
def __init__(self):
self.current_limit = 10
self.min_limit = 5
self.max_limit = 100
self.success_rate = 1.0
self.adjustment_interval = 10 # 秒
async def adjust(self):
"""调整并发限制"""
while True:
await asyncio.sleep(self.adjustment_interval)
if self.success_rate > 0.95:
# 成功率高,增加并发
self.current_limit = min(
self.current_limit * 1.2,
self.max_limit
)
elif self.success_rate < 0.8:
# 成功率低,减少并发
self.current_limit = max(
self.current_limit * 0.8,
self.min_limit
)
return AdaptiveLimiter()3.3 Token 管理与成本控制#
Token 成本优化系统:
from dataclasses import dataclass
from typing import Dict, List
import tiktoken
@dataclass
class TokenBudget:
"""Token 预算"""
total_budget: int
used: int = 0
@property
def remaining(self) -> int:
return self.total_budget - self.used
def can_afford(self, tokens: int) -> bool:
return self.remaining >= tokens
class TokenOptimizer:
"""Token 优化器"""
def __init__(self):
self.encoding = tiktoken.encoding_for_model("gpt-4")
self.pricing = {
"gpt-4o": {"input": 0.0025, "output": 0.01}, # per 1K tokens (2024年最新价格)
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}
}
def count_tokens(self, text: str) -> int:
"""计算 Token 数量"""
return len(self.encoding.encode(text))
def optimize_prompt(self, prompt: str, max_tokens: int) -> str:
"""优化提示词长度"""
tokens = self.count_tokens(prompt)
if tokens <= max_tokens:
return prompt
# 策略 1:截断
if tokens < max_tokens * 1.5:
return self._truncate_prompt(prompt, max_tokens)
# 策略 2:摘要
return self._summarize_prompt(prompt, max_tokens)
def _truncate_prompt(self, prompt: str, max_tokens: int) -> str:
"""截断提示词"""
tokens = self.encoding.encode(prompt)
# 保留开头和结尾
if max_tokens > 200:
start_tokens = tokens[:max_tokens//2]
end_tokens = tokens[-(max_tokens//2):]
truncated = start_tokens + end_tokens
return self.encoding.decode(truncated)
# 只保留开头
return self.encoding.decode(tokens[:max_tokens])
def _summarize_prompt(self, prompt: str, max_tokens: int) -> str:
"""摘要提示词"""
# 使用便宜的模型进行摘要
summary_prompt = f"将以下内容摘要到 {max_tokens} tokens 以内:\n\n{prompt}"
# 调用 gpt-3.5-turbo 进行摘要
# ... 实现略
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""计算成本"""
if model not in self.pricing:
raise ValueError(f"Unknown model: {model}")
pricing = self.pricing[model]
input_cost = (input_tokens / 1000) * pricing["input"]
output_cost = (output_tokens / 1000) * pricing["output"]
return input_cost + output_cost
def cost_aware_routing(self, task: dict, budget: TokenBudget) -> str:
"""成本感知路由"""
# 估算不同模型的成本
estimates = {}
for model in self.pricing.keys():
estimated_tokens = self._estimate_tokens(task, model)
estimated_cost = self.calculate_cost(
model,
estimated_tokens["input"],
estimated_tokens["output"]
)
if budget.can_afford(estimated_cost * 1000): # 转换为 token 单位
estimates[model] = {
"cost": estimated_cost,
"quality": self._get_model_quality(model)
}
if not estimates:
raise ValueError("预算不足")
# 在预算内选择质量最好的
best_model = max(estimates.items(), key=lambda x: x[1]["quality"])
return best_model[0]3.4 缓存复用策略#
多层缓存架构:
import hashlib
import pickle
from typing import Any, Optional
from datetime import datetime, timedelta
class MultiLevelCache:
"""多层缓存系统"""
def __init__(self):
self.l1_cache = {} # 内存缓存
self.l2_cache = Redis() # Redis 缓存
self.l3_cache = S3Cache() # S3 长期缓存
def get(self, key: str) -> Optional[Any]:
"""获取缓存(逐层查找)"""
# L1 缓存
if key in self.l1_cache:
self._promote_to_l1(key, self.l1_cache[key])
return self.l1_cache[key]
# L2 缓存
value = self.l2_cache.get(key)
if value:
self._promote_to_l1(key, value)
return value
# L3 缓存
value = self.l3_cache.get(key)
if value:
self._promote_to_l2(key, value)
self._promote_to_l1(key, value)
return value
return None
def set(
self,
key: str,
value: Any,
ttl_l1: int = 300, # 5 分钟
ttl_l2: int = 3600, # 1 小时
ttl_l3: int = 86400 # 1 天
):
"""设置缓存(写入所有层)"""
# 写入 L1
self.l1_cache[key] = value
self._schedule_l1_eviction(key, ttl_l1)
# 写入 L2
self.l2_cache.setex(key, ttl_l2, pickle.dumps(value))
# 写入 L3
self.l3_cache.put(key, value, ttl_l3)
def _promote_to_l1(self, key: str, value: Any):
"""提升到 L1 缓存"""
self.l1_cache[key] = value
def _promote_to_l2(self, key: str, value: Any):
"""提升到 L2 缓存"""
self.l2_cache.setex(key, 3600, pickle.dumps(value))
class SemanticCache:
"""语义缓存"""
def __init__(self, similarity_threshold: float = 0.95):
self.embeddings = OpenAIEmbeddings()
self.vector_store = FAISS()
self.cache = {}
self.similarity_threshold = similarity_threshold
def get(self, query: str) -> Optional[str]:
"""基于语义相似度获取缓存"""
# 获取查询向量
query_embedding = self.embeddings.embed_query(query)
# 搜索相似查询
similar_docs = self.vector_store.similarity_search_with_score(
query,
k=1
)
if similar_docs:
doc, score = similar_docs[0]
if score >= self.similarity_threshold:
# 找到相似查询,返回缓存结果
cache_key = doc.metadata["cache_key"]
return self.cache.get(cache_key)
return None
def set(self, query: str, result: str):
"""设置语义缓存"""
# 生成缓存键
cache_key = hashlib.md5(query.encode()).hexdigest()
# 存储结果
self.cache[cache_key] = result
# 存储查询向量
self.vector_store.add_texts(
[query],
metadatas=[{"cache_key": cache_key, "timestamp": datetime.now()}]
)3.5 自适应模型选择#
根据查询复杂度自动选择模型:
class AdaptiveModelSelector:
"""自适应模型选择器"""
def select_model(self, query: str, context: dict) -> str:
"""根据查询复杂度选择模型"""
complexity = self._estimate_complexity(query, context)
if complexity < 0.3:
return "gpt-3.5-turbo" # 简单查询,节省成本
elif complexity < 0.7:
return "gpt-4o-mini" # 中等查询
else:
return "gpt-4o" # 复杂查询,确保质量
def _estimate_complexity(self, query: str, context: dict) -> float:
"""估算查询复杂度(0-1)"""
score = 0
# 查询长度
if len(query) > 100:
score += 0.2
# 是否需要多步推理
reasoning_keywords = ["为什么", "如何", "分析", "对比"]
if any(kw in query for kw in reasoning_keywords):
score += 0.3
# 是否需要工具
if context.get("tools_required"):
score += 0.2
# 上下文长度
if len(context.get("history", [])) > 10:
score += 0.3
return min(score, 1.0)
# 使用示例
selector = AdaptiveModelSelector()
queries = [
"你好", # 简单 → gpt-3.5-turbo
"如何优化Python代码性能?", # 中等 → gpt-4o-mini
"分析这段代码的时间复杂度并提出优化方案", # 复杂 → gpt-4o
]
for query in queries:
model = selector.select_model(query, {})
print(f"查询: {query}\n选择模型: {model}\n")
# 成本节省:
# 假设100次查询:
# - 60%简单(gpt-3.5-turbo: $0.001/次) = $0.06
# - 30%中等(gpt-4o-mini: $0.005/次) = $0.15
# - 10%复杂(gpt-4o: $0.01/次) = $0.10
# 总成本 = $0.31
# vs 全用gpt-4o = $1.00
# 节省69%3.6 批处理优化#
批量处理减少网络开销:
import asyncio
from typing import List, Dict
async def batch_process(
queries: List[str],
batch_size: int = 10
) -> List[Dict]:
"""批量处理查询(减少overhead)"""
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i+batch_size]
# 并发处理batch内的查询
tasks = [
asyncio.create_task(agent.ainvoke({"messages": [("user", q)]}))
for q in batch
]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
return results
# 使用
queries = [f"查询{i}" for i in range(100)]
results = asyncio.run(batch_process(queries, batch_size=10))
# 优势:
# 1. 减少网络overhead(连接复用)
# 2. 可以利用批量定价(某些API提供商)
# 3. 提高整体吞吐量3.7 成本监控与优化清单#
实时成本追踪系统:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict
@dataclass
class CostMetrics:
"""成本指标"""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost_usd: float
class CostMonitor:
"""成本监控器"""
# 模型价格($/1M tokens,2025年11月)
PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
}
def __init__(self):
self.metrics: List[CostMetrics] = []
def record(
self,
model: str,
input_tokens: int,
output_tokens: int
):
"""记录一次调用的成本"""
pricing = self.PRICING[model]
cost = (
input_tokens * pricing["input"] / 1_000_000 +
output_tokens * pricing["output"] / 1_000_000
)
self.metrics.append(CostMetrics(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost
))
def get_total_cost(self, hours: int = 24) -> Dict:
"""获取总成本统计"""
cutoff = datetime.now() - timedelta(hours=hours)
recent = [m for m in self.metrics if m.timestamp > cutoff]
return {
"total_cost_usd": sum(m.cost_usd for m in recent),
"total_requests": len(recent),
"avg_cost_per_request": sum(m.cost_usd for m in recent) / len(recent) if recent else 0,
"by_model": {
model: {
"requests": len([m for m in recent if m.model == model]),
"cost": sum(m.cost_usd for m in recent if m.model == model)
}
for model in self.PRICING.keys()
}
}
# 使用
monitor = CostMonitor()
# 记录调用
monitor.record("gpt-4o", input_tokens=2000, output_tokens=500)
monitor.record("gpt-4o-mini", input_tokens=1500, output_tokens=300)
# 查看成本
stats = monitor.get_total_cost(hours=24)
print(f"24小时总成本: ${stats['total_cost_usd']:.4f}")
print(f"平均每次请求: ${stats['avg_cost_per_request']:.6f}")
print(f"按模型分组: {stats['by_model']}")成本优化清单:
Prompt优化
- 精简系统提示词(避免冗余描述)
- 移除无关信息
- 使用简洁表达
上下文管理
- 限制历史消息数量(trim_messages)
- 压缩旧消息为摘要
- 智能裁剪上下文窗口
缓存策略
- 精确缓存(相同查询)
- 语义缓存(相似查询)
- 设置合理TTL
模型选择
- 根据复杂度自适应选择
- 简单任务用gpt-3.5-turbo
- 工具调用优先用mini模型
批处理
- 合并相似请求
- 批量处理降低overhead
- 并发执行提高效率
成本目标:
| 场景 | 优化前 | 优化后 | 节省 |
|---|---|---|---|
| 简单问答 | $0.01/次 | $0.001/次 | 90% |
| 复杂推理 | $0.05/次 | $0.02/次 | 60% |
| 多轮对话 | $0.10/次 | $0.04/次 | 60% |
监控指标:
- 平均每次请求成本
- 每日/每月总成本
- 各模型成本占比
- Token使用效率(输出/输入比)
第4章: 安全合规与防护#
关注点:掌握生产环境的安全防护和合规要求。
4.1 Guardrails 防护体系#
双层防护架构:
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class Guardrail(ABC):
"""防护栏基类"""
@abstractmethod
def check(self, content: str) -> Dict[str, Any]:
"""检查内容"""
pass
class DeterministicGuardrail(Guardrail):
"""确定性防护(基于规则)"""
def __init__(self):
self.rules = self._load_rules()
def _load_rules(self):
"""加载规则"""
return {
"sql_injection": r"(\bSELECT\b.*\bFROM\b|\bDROP\b|\bDELETE\b.*\bFROM\b)",
"xss": r"(<script|javascript:|onerror=|onclick=)",
"pii_email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"pii_phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
"pii_ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"profanity": self._load_profanity_list()
}
def check(self, content: str) -> Dict[str, Any]:
"""规则检查"""
violations = []
for rule_name, pattern in self.rules.items():
if re.search(pattern, content, re.IGNORECASE):
violations.append({
"rule": rule_name,
"severity": self._get_severity(rule_name),
"action": self._get_action(rule_name)
})
return {
"passed": len(violations) == 0,
"violations": violations
}
def _get_severity(self, rule: str) -> str:
"""获取严重程度"""
severity_map = {
"sql_injection": "critical",
"xss": "critical",
"pii_ssn": "high",
"pii_email": "medium",
"profanity": "low"
}
return severity_map.get(rule, "medium")
def _get_action(self, rule: str) -> str:
"""获取处理动作"""
action_map = {
"sql_injection": "block",
"xss": "block",
"pii_ssn": "redact",
"pii_email": "mask",
"profanity": "warn"
}
return action_map.get(rule, "warn")
class ModelBasedGuardrail(Guardrail):
"""基于模型的防护"""
def __init__(self, model: ChatOpenAI):
self.model = model
self.categories = [
"harmful_content",
"bias",
"misinformation",
"inappropriate",
"off_topic"
]
def check(self, content: str) -> Dict[str, Any]:
"""模型检查"""
prompt = f"""分析以下内容是否存在问题。
内容:{content}
检查类别:
{', '.join(self.categories)}
返回 JSON 格式:
{{
"passed": true/false,
"violations": [
{{"category": "...", "confidence": 0.0-1.0, "reason": "..."}}
]
}}"""
response = self.model.invoke(prompt)
# 解析响应
try:
result = json.loads(response.content)
return result
except:
# 解析失败,保守处理
return {
"passed": False,
"violations": [{
"category": "parse_error",
"confidence": 1.0,
"reason": "无法解析检查结果"
}]
}
class HybridGuardrailSystem:
"""混合防护系统"""
def __init__(self):
self.deterministic = DeterministicGuardrail()
self.model_based = ModelBasedGuardrail(ChatOpenAI(model="gpt-4o"))
self.cache = {}
def check(self, content: str, use_cache: bool = True) -> Dict[str, Any]:
"""综合检查"""
# 检查缓存
if use_cache:
cache_key = hashlib.md5(content.encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
# 第一层:确定性检查(快速)
deterministic_result = self.deterministic.check(content)
# 如果确定性检查发现严重问题,直接返回
if not deterministic_result["passed"]:
critical_violations = [
v for v in deterministic_result["violations"]
if v["severity"] == "critical"
]
if critical_violations:
result = {
"passed": False,
"stage": "deterministic",
"violations": critical_violations,
"action": "block"
}
if use_cache:
self.cache[cache_key] = result
return result
# 第二层:模型检查(更全面但较慢)
model_result = self.model_based.check(content)
# 合并结果
all_violations = (
deterministic_result.get("violations", []) +
model_result.get("violations", [])
)
result = {
"passed": len(all_violations) == 0,
"stage": "hybrid",
"violations": all_violations,
"action": self._determine_action(all_violations)
}
if use_cache:
self.cache[cache_key] = result
return result
def _determine_action(self, violations: List[dict]) -> str:
"""确定处理动作"""
if not violations:
return "pass"
# 根据最严重的违规确定动作
severities = [v.get("severity", "medium") for v in violations]
if "critical" in severities:
return "block"
elif "high" in severities:
return "review"
else:
return "warn"4.2 PII 检测策略#
PII 检测与处理:
import re
from enum import Enum
from typing import List, Tuple
class PIIType(Enum):
"""PII 类型"""
EMAIL = "email"
PHONE = "phone"
SSN = "ssn"
CREDIT_CARD = "credit_card"
IP_ADDRESS = "ip"
NAME = "name"
ADDRESS = "address"
class PIIStrategy(Enum):
"""PII 处理策略"""
REDACT = "redact" # 完全删除
MASK = "mask" # 部分遮蔽
HASH = "hash" # 哈希替换
ENCRYPT = "encrypt" # 加密
TOKENIZE = "tokenize" # 令牌化
class PIIDetector:
"""PII 检测器"""
def __init__(self):
self.patterns = {
PIIType.EMAIL: r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
PIIType.PHONE: r'(\+\d{1,3}[-.\s]?)?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}',
PIIType.SSN: r'\b\d{3}-\d{2}-\d{4}\b',
PIIType.CREDIT_CARD: r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
PIIType.IP_ADDRESS: r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
}
# 名称检测需要 NER 模型
self.ner_model = self._load_ner_model()
def detect(self, text: str) -> List[Tuple[PIIType, str, int, int]]:
"""检测 PII"""
detections = []
# 正则检测
for pii_type, pattern in self.patterns.items():
for match in re.finditer(pattern, text):
detections.append((
pii_type,
match.group(),
match.start(),
match.end()
))
# NER 检测(名称、地址)
entities = self.ner_model.extract_entities(text)
for entity in entities:
if entity["type"] == "PERSON":
detections.append((
PIIType.NAME,
entity["text"],
entity["start"],
entity["end"]
))
elif entity["type"] == "LOCATION":
detections.append((
PIIType.ADDRESS,
entity["text"],
entity["start"],
entity["end"]
))
return detections
def process(self, text: str, strategy: PIIStrategy) -> str:
"""处理 PII"""
detections = self.detect(text)
# 按位置倒序处理(避免索引偏移)
detections.sort(key=lambda x: x[2], reverse=True)
processed_text = text
for pii_type, value, start, end in detections:
replacement = self._get_replacement(pii_type, value, strategy)
processed_text = processed_text[:start] + replacement + processed_text[end:]
return processed_text
def _get_replacement(self, pii_type: PIIType, value: str, strategy: PIIStrategy) -> str:
"""获取替换文本"""
if strategy == PIIStrategy.REDACT:
return f"[{pii_type.value.upper()}_REDACTED]"
elif strategy == PIIStrategy.MASK:
if pii_type == PIIType.EMAIL:
# 保留域名
parts = value.split('@')
if len(parts) == 2:
masked_local = parts[0][0] + '*' * (len(parts[0]) - 2) + parts[0][-1]
return f"{masked_local}@{parts[1]}"
elif pii_type == PIIType.PHONE:
# 保留前3位和后2位
if len(value) >= 5:
return value[:3] + '*' * (len(value) - 5) + value[-2:]
# 默认遮蔽
return '*' * len(value)
elif strategy == PIIStrategy.HASH:
return f"[HASH:{hashlib.sha256(value.encode()).hexdigest()[:8]}]"
elif strategy == PIIStrategy.ENCRYPT:
# 实际加密实现
encrypted = self._encrypt(value)
return f"[ENC:{encrypted}]"
elif strategy == PIIStrategy.TOKENIZE:
# 生成唯一令牌
token = self._generate_token(pii_type, value)
return f"[TOKEN:{token}]"
return value4.3 分层防护组合#
多层防护架构:
class LayeredDefenseSystem:
"""分层防护系统"""
def __init__(self):
self.layers = [
InputValidationLayer(),
RateLimitLayer(),
AuthenticationLayer(),
PIIProtectionLayer(),
ContentModerationLayer(),
OutputSanitizationLayer(),
AuditLoggingLayer()
]
async def process_request(self, request: dict) -> dict:
"""处理请求(通过所有防护层)"""
context = {
"request": request,
"user": request.get("user"),
"timestamp": datetime.now(),
"trace_id": str(uuid.uuid4())
}
# 逐层处理
for layer in self.layers:
try:
result = await layer.process(context)
if not result["passed"]:
# 某层未通过,记录并返回
self._log_rejection(layer, context, result)
return {
"status": "rejected",
"layer": layer.__class__.__name__,
"reason": result.get("reason"),
"trace_id": context["trace_id"]
}
# 更新上下文
context.update(result.get("context_updates", {}))
except Exception as e:
# 层处理异常
self._log_error(layer, context, e)
# 根据层的重要性决定是否继续
if layer.is_critical():
return {
"status": "error",
"layer": layer.__class__.__name__,
"error": str(e),
"trace_id": context["trace_id"]
}
# 所有层都通过
return {
"status": "approved",
"context": context,
"trace_id": context["trace_id"]
}
class DefenseLayer(ABC):
"""防护层基类"""
@abstractmethod
async def process(self, context: dict) -> dict:
"""处理请求"""
pass
def is_critical(self) -> bool:
"""是否关键层(失败时必须停止)"""
return False
class InputValidationLayer(DefenseLayer):
"""输入验证层"""
async def process(self, context: dict) -> dict:
request = context["request"]
# 验证必填字段
required_fields = ["user_id", "content", "timestamp"]
for field in required_fields:
if field not in request:
return {
"passed": False,
"reason": f"Missing required field: {field}"
}
# 验证输入长度
if len(request.get("content", "")) > 10000:
return {
"passed": False,
"reason": "Content too long (max 10000 chars)"
}
# 验证输入格式
if not self._validate_format(request):
return {
"passed": False,
"reason": "Invalid input format"
}
return {"passed": True}
def is_critical(self) -> bool:
return True
class RateLimitLayer(DefenseLayer):
"""速率限制层"""
def __init__(self):
self.limits = {} # user_id -> requests
async def process(self, context: dict) -> dict:
user_id = context["user"]["id"]
# 检查速率限制
current_count = self.limits.get(user_id, 0)
if current_count >= 100: # 每分钟 100 个请求
return {
"passed": False,
"reason": "Rate limit exceeded"
}
# 更新计数
self.limits[user_id] = current_count + 1
# 异步重置计数器
asyncio.create_task(self._reset_counter(user_id))
return {"passed": True}
async def _reset_counter(self, user_id: str):
"""60秒后重置计数器"""
await asyncio.sleep(60)
self.limits[user_id] = 04.4 自定义 Guardrails 开发#
自定义防护栏框架:
from typing import Callable, List, Optional
import inspect
class GuardrailFramework:
"""防护栏框架"""
def __init__(self):
self.pre_processors = []
self.validators = []
self.post_processors = []
def pre_process(self, func: Callable) -> Callable:
"""注册预处理器"""
self.pre_processors.append(func)
return func
def validate(self, func: Callable) -> Callable:
"""注册验证器"""
self.validators.append(func)
return func
def post_process(self, func: Callable) -> Callable:
"""注册后处理器"""
self.post_processors.append(func)
return func
async def execute(self, input_data: Any) -> Any:
"""执行防护流程"""
# 预处理
processed_data = input_data
for processor in self.pre_processors:
if inspect.iscoroutinefunction(processor):
processed_data = await processor(processed_data)
else:
processed_data = processor(processed_data)
# 验证
for validator in self.validators:
if inspect.iscoroutinefunction(validator):
is_valid = await validator(processed_data)
else:
is_valid = validator(processed_data)
if not is_valid:
raise ValidationError(f"Validation failed: {validator.__name__}")
# 后处理
for processor in self.post_processors:
if inspect.iscoroutinefunction(processor):
processed_data = await processor(processed_data)
else:
processed_data = processor(processed_data)
return processed_data
# 使用示例
guardrail = GuardrailFramework()
@guardrail.pre_process
def sanitize_input(data: dict) -> dict:
"""清理输入"""
data["content"] = data["content"].strip()
return data
@guardrail.validate
def check_content_length(data: dict) -> bool:
"""检查内容长度"""
return len(data["content"]) <= 1000
@guardrail.validate
async def check_toxicity(data: dict) -> bool:
"""检查毒性(使用外部 API)"""
# 调用毒性检测 API
result = await toxicity_api.check(data["content"])
return result["toxicity_score"] < 0.7
@guardrail.post_process
def add_metadata(data: dict) -> dict:
"""添加元数据"""
data["processed_at"] = datetime.now()
data["guardrail_version"] = "1.0"
return data
# 执行
result = await guardrail.execute({"content": "用户输入"})4.5 输入输出安全#
安全处理框架:
import html
import re
from typing import Optional
class SecurityFramework:
"""安全框架"""
def __init__(self):
self.input_sanitizer = InputSanitizer()
self.output_validator = OutputValidator()
self.encryption = EncryptionService()
def secure_input(self, raw_input: str) -> str:
"""安全化输入"""
# 1. HTML 实体编码
sanitized = html.escape(raw_input)
# 2. SQL 注入防护
sanitized = self._prevent_sql_injection(sanitized)
# 3. 命令注入防护
sanitized = self._prevent_command_injection(sanitized)
# 4. 路径遍历防护
sanitized = self._prevent_path_traversal(sanitized)
return sanitized
def _prevent_sql_injection(self, text: str) -> str:
"""防止 SQL 注入"""
# 使用参数化查询,这里只是示例
dangerous_patterns = [
r"('\s*OR\s*')",
r"(;\s*DROP\s+TABLE)",
r"(UNION\s+SELECT)",
r"(INSERT\s+INTO)",
r"(UPDATE\s+.*\s+SET)"
]
for pattern in dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE):
# 记录并清理
self._log_security_event("sql_injection_attempt", text)
text = re.sub(pattern, "", text, flags=re.IGNORECASE)
return text
def secure_output(self, output: str, context: dict) -> str:
"""安全化输出"""
# 1. 敏感信息脱敏
output = self._redact_sensitive_info(output)
# 2. XSS 防护
output = self._prevent_xss(output)
# 3. 信息泄露防护
output = self._prevent_info_disclosure(output)
return output
def _redact_sensitive_info(self, text: str) -> str:
"""脱敏敏感信息"""
# API 密钥
text = re.sub(r'(api[_-]?key[\s:=]+)[\w-]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)
# 密码
text = re.sub(r'(password[\s:=]+)\S+', r'\1[REDACTED]', text, flags=re.IGNORECASE)
# Token
text = re.sub(r'(token[\s:=]+)[\w-]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)
return text4.6 数据合规#
GDPR 合规实现:
class GDPRCompliance:
"""GDPR 合规管理"""
def __init__(self):
self.consent_manager = ConsentManager()
self.data_processor = DataProcessor()
self.audit_logger = AuditLogger()
def process_personal_data(self, data: dict, user_id: str) -> dict:
"""处理个人数据"""
# 1. 检查同意
if not self.consent_manager.has_consent(user_id, "data_processing"):
raise PermissionError("No consent for data processing")
# 2. 数据最小化
minimal_data = self._minimize_data(data)
# 3. 假名化
pseudonymized = self._pseudonymize(minimal_data, user_id)
# 4. 记录处理活动
self.audit_logger.log_processing_activity(
user_id=user_id,
purpose="service_provision",
legal_basis="consent",
data_categories=self._get_data_categories(minimal_data)
)
return pseudonymized
def _minimize_data(self, data: dict) -> dict:
"""数据最小化"""
# 只保留必要字段
required_fields = ["name", "email", "query"]
minimal = {}
for field in required_fields:
if field in data:
minimal[field] = data[field]
return minimal
def _pseudonymize(self, data: dict, user_id: str) -> dict:
"""假名化处理"""
# 生成假名
pseudo_id = hashlib.sha256(f"{user_id}{SECRET_SALT}".encode()).hexdigest()[:16]
# 替换标识符
data["user_id"] = pseudo_id
# 加密敏感字段
if "email" in data:
data["email_encrypted"] = self.encryption.encrypt(data["email"])
del data["email"]
return data
def handle_data_request(self, request_type: str, user_id: str) -> dict:
"""处理数据请求(GDPR 权利)"""
if request_type == "access":
# 数据访问权
return self._export_user_data(user_id)
elif request_type == "portability":
# 数据可携带权
return self._export_portable_data(user_id)
elif request_type == "erasure":
# 被遗忘权
return self._erase_user_data(user_id)
elif request_type == "rectification":
# 数据更正权
return self._rectify_user_data(user_id)
else:
raise ValueError(f"Unknown request type: {request_type}")4.7 审计日志#
完整审计系统:
class AuditSystem:
"""审计系统"""
def __init__(self):
self.storage = AuditStorage()
self.analyzer = AuditAnalyzer()
def log_event(
self,
event_type: str,
user_id: str,
action: str,
resource: str,
result: str,
metadata: Optional[dict] = None
):
"""记录审计事件"""
event = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"event_type": event_type,
"user_id": user_id,
"action": action,
"resource": resource,
"result": result,
"metadata": metadata or {},
"ip_address": self._get_client_ip(),
"user_agent": self._get_user_agent(),
"session_id": self._get_session_id()
}
# 存储
self.storage.store(event)
# 实时分析
self.analyzer.analyze(event)
return event["id"]
def query_logs(
self,
filters: dict,
start_time: datetime,
end_time: datetime,
limit: int = 100
) -> List[dict]:
"""查询审计日志"""
return self.storage.query(filters, start_time, end_time, limit)
def generate_compliance_report(self, period: str) -> dict:
"""生成合规报告"""
report = {
"period": period,
"generated_at": datetime.utcnow().isoformat(),
"statistics": self._calculate_statistics(period),
"anomalies": self._detect_anomalies(period),
"compliance_status": self._check_compliance(period)
}
return report
class AuditStorage:
"""审计存储"""
def __init__(self):
# 使用不可变存储
self.immutable_store = ImmutableLogStore()
def store(self, event: dict):
"""存储审计事件"""
# 添加完整性校验
event["hash"] = self._calculate_hash(event)
# 签名
event["signature"] = self._sign_event(event)
# 存储到不可变存储
self.immutable_store.append(event)
# 异步复制到长期存储
asyncio.create_task(self._replicate_to_cold_storage(event))
def _calculate_hash(self, event: dict) -> str:
"""计算事件哈希"""
# 确保顺序一致
canonical = json.dumps(event, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()
def _sign_event(self, event: dict) -> str:
"""数字签名"""
# 使用私钥签名
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
message = json.dumps(event, sort_keys=True).encode()
signature = PRIVATE_KEY.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode()第5章: 部署与运维#
关注点:掌握生产环境的部署策略和运维实践。
5.1 部署策略#
容器化部署:
生产级Dockerfile:
# Dockerfile
FROM python:3.11-slim
# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# 安全:非 root 用户
RUN useradd -m -u 1000 langchain && \
mkdir -p /app && \
chown -R langchain:langchain /app
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 依赖(分层缓存优化)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 应用代码
COPY --chown=langchain:langchain . .
# 切换用户
USER langchain
# 暴露端口
EXPOSE 8000
# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=2)"
# 启动(使用生产级配置)
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]完整docker-compose.yml:
# docker-compose.yml
version: '3.8'
services:
langchain-app:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
# 从.env文件或环境变量加载
- LANGCHAIN_TRACING_V2=${LANGCHAIN_TRACING_V2:-false}
- LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- REDIS_HOST=redis
- REDIS_PORT=6379
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
depends_on:
- redis
- prometheus
networks:
- langchain-network
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
restart: unless-stopped
networks:
- langchain-network
command: redis-server --appendonly yes
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./prometheus_alerts.yml:/etc/prometheus/prometheus_alerts.yml
- prometheus-data:/prometheus
restart: unless-stopped
networks:
- langchain-network
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana-data:/var/lib/grafana
restart: unless-stopped
networks:
- langchain-network
depends_on:
- prometheus
volumes:
redis-data:
prometheus-data:
grafana-data:
networks:
langchain-network:
driver: bridgerequirements.txt示例:
# requirements.txt
langchain>=1.0.7
langchain-openai>=1.0.3
langchain-core>=1.0.7
langchain-community>=1.0.7
langgraph>=1.0.3
langsmith>=0.4.43
# Web框架
fastapi==0.115.6
uvicorn[standard]==0.34.0
# 监控
prometheus-client==0.21.0
# 缓存
redis==5.2.1
# 工具
python-dotenv==1.0.1
pydantic==2.10.4
pydantic-settings==2.7.0.env示例:
# .env
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langsmith_key
OPENAI_API_KEY=your_openai_key
GRAFANA_PASSWORD=your_grafana_password.dockerignore:
# .dockerignore
__pycache__/
*.py[cod]
*$py.class
*.so
.env
.venv/
venv/
*.log
.git/
.gitignore
.pytest_cache/
.coverage
htmlcov/
dist/
build/
*.egg-info/
.DS_Store部署命令:
# 构建镜像
docker-compose build
# 启动服务
docker-compose up -d
# 查看日志
docker-compose logs -f langchain-app
# 停止服务
docker-compose down
# 完全清理(包括数据卷)
docker-compose down -vKubernetes 部署配置:
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: langchain-app
spec:
replicas: 3
selector:
matchLabels:
app: langchain
template:
metadata:
labels:
app: langchain
spec:
containers:
- name: app
image: langchain-app:latest
ports:
- containerPort: 8080
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: langchain-secrets
key: openai-api-key
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: langchain-service
spec:
selector:
app: langchain
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: langchain-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: langchain-app
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80Serverless 部署(AWS Lambda):
# handler.py
import json
from mangum import Mangum
from fastapi import FastAPI
from langchain.agents import create_agent
app = FastAPI()
# 初始化(冷启动优化)
agent = None
def get_agent():
global agent
if agent is None:
agent = create_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[],
system_prompt="你是一个助手"
)
return agent
@app.post("/chat")
async def chat(request: dict):
agent = get_agent()
response = agent.invoke(request)
return response
# Lambda handler
handler = Mangum(app)# serverless.yml
service: langchain-service
provider:
name: aws
runtime: python3.11
region: us-east-1
timeout: 30
memorySize: 1024
environment:
OPENAI_API_KEY: ${ssm:/langchain/openai_api_key}
functions:
chat:
handler: handler.handler
events:
- http:
path: /chat
method: post
cors: true
reservedConcurrency: 10
provisionedConcurrency: 2 # 减少冷启动
plugins:
- serverless-python-requirements
custom:
pythonRequirements:
dockerizePip: true
slim: true
strip: false5.2 监控告警#
5.2.1 Prometheus监控集成#
1. 安装依赖
pip install prometheus-client2. 完整示例
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from typing import Dict, Any
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 定义Prometheus指标
llm_requests = Counter(
'langchain_llm_requests_total',
'Total LLM requests',
['model', 'status']
)
llm_latency = Histogram(
'langchain_llm_latency_seconds',
'LLM request latency',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
llm_tokens = Counter(
'langchain_llm_tokens_total',
'Total tokens used',
['model', 'type'] # type: prompt/completion
)
llm_errors = Counter(
'langchain_llm_errors_total',
'Total LLM errors',
['model', 'error_type']
)
# 集成到LangChain回调
class PrometheusCallback(BaseCallbackHandler):
"""Prometheus监控回调"""
def on_llm_start(self, serialized, prompts, **kwargs):
"""LLM开始时记录"""
self.start_time = time.time()
def on_llm_end(self, response, **kwargs):
"""LLM结束时记录指标"""
duration = time.time() - self.start_time
# 获取模型信息
model = response.llm_output.get("model_name", "unknown") if response.llm_output else "unknown"
# 记录成功指标
llm_requests.labels(model=model, status="success").inc()
llm_latency.labels(model=model).observe(duration)
# 记录token使用
if response.llm_output:
token_usage = response.llm_output.get("token_usage", {})
llm_tokens.labels(model=model, type="prompt").inc(
token_usage.get("prompt_tokens", 0)
)
llm_tokens.labels(model=model, type="completion").inc(
token_usage.get("completion_tokens", 0)
)
def on_llm_error(self, error, **kwargs):
"""LLM错误时记录"""
error_type = type(error).__name__
llm_requests.labels(model="unknown", status="error").inc()
llm_errors.labels(model="unknown", error_type=error_type).inc()
# 启动Prometheus HTTP服务器(暴露指标)
start_http_server(8001) # 在独立端口暴露指标(避免与应用端口8000冲突)
# 使用示例
chain = ChatOpenAI(model="gpt-4o-mini") | StrOutputParser()
result = chain.invoke(
"你好,介绍一下LangChain",
config={"callbacks": [PrometheusCallback()]}
)
print(f"访问 http://localhost:8001/metrics 查看指标")3. Grafana可视化配置
{
"dashboard": {
"title": "LangChain监控面板",
"panels": [
{
"title": "LLM请求速率(每秒)",
"targets": [{
"expr": "rate(langchain_llm_requests_total[5m])",
"legendFormat": "{{model}} - {{status}}"
}],
"type": "graph"
},
{
"title": "LLM延迟分布(P50/P95/P99)",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(langchain_llm_latency_seconds_bucket[5m]))",
"legendFormat": "P50 - {{model}}"
},
{
"expr": "histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m]))",
"legendFormat": "P95 - {{model}}"
},
{
"expr": "histogram_quantile(0.99, rate(langchain_llm_latency_seconds_bucket[5m]))",
"legendFormat": "P99 - {{model}}"
}
],
"type": "graph"
},
{
"title": "Token使用量(每分钟)",
"targets": [{
"expr": "rate(langchain_llm_tokens_total[1m])",
"legendFormat": "{{model}} - {{type}}"
}],
"type": "graph"
},
{
"title": "错误率",
"targets": [{
"expr": "rate(langchain_llm_errors_total[5m])",
"legendFormat": "{{model}} - {{error_type}}"
}],
"type": "graph"
}
]
}
}4. Prometheus告警规则
# prometheus_alerts.yml
groups:
- name: langchain_alerts
rules:
- alert: HighLLMLatency
expr: histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m])) > 5
for: 5m
labels:
severity: warning
annotations:
summary: "LLM延迟过高"
description: "P95延迟超过5秒: {{ $value }}s"
- alert: CriticalLLMLatency
expr: histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m])) > 10
for: 2m
labels:
severity: critical
annotations:
summary: "LLM延迟严重过高"
description: "P95延迟超过10秒: {{ $value }}s"
- alert: HighErrorRate
expr: rate(langchain_llm_requests_total{status="error"}[5m]) / rate(langchain_llm_requests_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "LLM错误率过高"
description: "错误率超过10%: {{ $value | humanizePercentage }}"
- alert: TokenBudgetExceeded
expr: increase(langchain_llm_tokens_total[1h]) > 1000000
for: 1m
labels:
severity: warning
annotations:
summary: "Token预算超限"
description: "过去1小时使用了{{ $value }}个tokens"
- alert: NoRecentRequests
expr: rate(langchain_llm_requests_total[10m]) == 0
for: 15m
labels:
severity: info
annotations:
summary: "无LLM请求"
description: "过去15分钟无LLM请求,可能服务异常"5. Prometheus配置文件
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- "prometheus_alerts.yml"
scrape_configs:
- job_name: 'langchain_app'
static_configs:
- targets: ['localhost:8001'] # LangChain应用的metrics端口5.2.2 监控系统实现#
监控中间件:
from prometheus_client import Counter, Histogram, Gauge
import logging
# Prometheus 指标
request_count = Counter('langchain_requests_total', 'Total requests', ['endpoint', 'status'])
request_duration = Histogram('langchain_request_duration_seconds', 'Request duration', ['endpoint'])
active_requests = Gauge('langchain_active_requests', 'Active requests')
model_tokens = Counter('langchain_model_tokens_total', 'Total tokens used', ['model'])
error_count = Counter('langchain_errors_total', 'Total errors', ['error_type'])
class MonitoringMiddleware:
"""监控中间件"""
async def __call__(self, request, call_next):
endpoint = request.url.path
# 记录活跃请求
active_requests.inc()
# 记录请求时间
with request_duration.labels(endpoint=endpoint).time():
try:
response = await call_next(request)
# 记录请求数
request_count.labels(
endpoint=endpoint,
status=response.status_code
).inc()
return response
except Exception as e:
# 记录错误
error_count.labels(error_type=type(e).__name__).inc()
raise
finally:
active_requests.dec()
# 告警规则(Prometheus AlertManager)
ALERT_RULES = """
groups:
- name: langchain_alerts
interval: 30s
rules:
- alert: HighErrorRate
expr: rate(langchain_errors_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: High error rate detected
description: "Error rate is {{ $value }} errors per second"
- alert: HighLatency
expr: histogram_quantile(0.95, langchain_request_duration_seconds) > 5
for: 5m
labels:
severity: warning
annotations:
summary: High latency detected
description: "95th percentile latency is {{ $value }} seconds"
- alert: TokenBudgetExceeded
expr: increase(langchain_model_tokens_total[1h]) > 1000000
labels:
severity: warning
annotations:
summary: Token budget exceeded
description: "Used {{ $value }} tokens in the last hour"
"""5.3 故障处理#
故障恢复系统:
class FaultToleranceSystem:
"""故障容错系统"""
def __init__(self):
self.health_checker = HealthChecker()
self.circuit_breaker = CircuitBreaker()
self.fallback_handler = FallbackHandler()
async def handle_request(self, request: dict) -> dict:
"""处理请求(带故障容错)"""
# 健康检查
if not await self.health_checker.is_healthy():
return await self.fallback_handler.handle(request)
# 熔断器
if self.circuit_breaker.is_open():
return await self.fallback_handler.handle(request)
try:
# 正常处理
response = await self.process_request(request)
self.circuit_breaker.record_success()
return response
except Exception as e:
self.circuit_breaker.record_failure()
# 故障分类处理
if isinstance(e, RateLimitError):
# 速率限制:等待后重试
await asyncio.sleep(e.retry_after)
return await self.handle_request(request)
elif isinstance(e, ModelError):
# 模型错误:切换备用模型
return await self.fallback_to_backup_model(request)
elif isinstance(e, TimeoutError):
# 超时:返回缓存或默认响应
cached = await self.get_cached_response(request)
if cached:
return cached
return self.get_default_response(request)
else:
# 未知错误:记录并返回错误响应
await self.log_error(e)
return {
"status": "error",
"message": "服务暂时不可用,请稍后重试"
}
class DisasterRecovery:
"""灾难恢复"""
def __init__(self):
self.backup_regions = ["us-west-2", "eu-west-1", "ap-northeast-1"]
self.primary_region = "us-east-1"
async def check_region_health(self, region: str) -> bool:
"""检查区域健康状态"""
try:
response = await self.ping_region(region)
return response.status_code == 200
except:
return False
async def failover(self):
"""故障转移"""
# 检查主区域
if await self.check_region_health(self.primary_region):
return self.primary_region
# 主区域故障,切换到备用区域
for region in self.backup_regions:
if await self.check_region_health(region):
await self.switch_traffic(region)
await self.notify_ops_team(f"Failover to {region}")
return region
# 所有区域都故障
raise CriticalError("All regions are down")5.4 生产部署检查清单#
5.4.1 安全检查清单#
API密钥与凭证管理:
- 环境变量管理:所有API密钥通过环境变量注入,禁止硬编码
- 密钥管理服务:使用AWS Secrets Manager、HashiCorp Vault等密钥管理服务
- 密钥轮换:定期轮换API密钥和访问令牌
- 最小权限原则:为不同环境配置不同权限级别的密钥
网络安全:
- 启用HTTPS/TLS:所有外部通信强制使用HTTPS
- TLS版本:使用TLS 1.2或更高版本
- 证书验证:验证SSL证书有效性
- API网关:通过API网关统一管理和保护API端点
速率限制与防护:
- 全局速率限制:限制每个用户/IP的请求频率
- 端点级限制:为不同端点设置不同速率限制
- DDoS防护:配置CloudFlare或AWS Shield等防护服务
- 请求大小限制:限制请求payload大小
输入验证与清洗:
- 输入长度验证:限制提示词和输入数据长度
- SQL注入防护:使用参数化查询,禁止拼接SQL
- XSS防护:对所有用户输入进行HTML实体编码
- 命令注入防护:禁止直接执行用户输入的命令
- 路径遍历防护:验证和清洗文件路径
敏感信息脱敏:
- PII检测:自动检测和脱敏个人身份信息
- 输出过滤:过滤API密钥、密码、Token等敏感信息
- 日志脱敏:确保日志中不包含敏感信息
- 错误信息处理:避免泄露系统内部信息
5.4.2 性能检查清单#
连接池配置:
- HTTP连接池:配置合理的连接池大小(建议:每个主机10-30个连接)
- 数据库连接池:配置数据库连接池(建议:最小5个,最大20个)
- Redis连接池:配置Redis连接池参数
- 连接超时:设置合理的连接超时时间(建议:10秒)
- 读取超时:设置读取超时时间(建议:30秒)
缓存策略:
- LLM响应缓存:启用语义缓存或精确匹配缓存
- 缓存层级:配置L1(内存)、L2(Redis)、L3(S3)多层缓存
- TTL配置:为不同数据类型设置合理的过期时间
- 缓存预热:在启动时预加载热点数据
- 缓存失效:实现缓存失效和更新策略
超时控制:
- LLM调用超时:设置LLM API调用超时(建议:30-60秒)
- 工具执行超时:为每个工具设置独立超时时间
- 总体超时:设置请求总超时时间
- 流式响应超时:配置流式响应的超时策略
批处理优化:
- 批量推理:支持批量处理多个请求
- 批次大小:根据模型和硬件优化批次大小
- 异步处理:使用异步I/O提升并发能力
- 并行执行:支持并行调用多个工具或LLM
5.4.3 可靠性检查清单#
重试策略:
- 指数退避:实现指数退避重试策略
- 重试次数:设置合理的最大重试次数(建议:3次)
- 重试条件:仅对可恢复的错误(429、500、503)进行重试
- 幂等性:确保重试操作是幂等的
- 超时重试:对超时请求进行重试
熔断器:
- 熔断阈值:设置失败率阈值(建议:50%)
- 恢复时间:配置熔断器自动恢复时间(建议:60秒)
- 半开状态:实现半开状态探测服务恢复
- 熔断告警:熔断器打开时发送告警
降级方案:
- 备用模型:配置备用LLM模型
- 缓存降级:服务不可用时返回缓存结果
- 默认响应:提供友好的默认响应
- 功能降级:关键路径失败时降级到简化功能
健康检查:
- 就绪检查:实现
/ready端点检查依赖服务 - 存活检查:实现
/health端点检查应用状态 - 深度检查:可选的深度健康检查(验证LLM连接等)
- 检查间隔:配置合理的健康检查间隔(建议:30秒)
5.4.4 监控检查清单#
LangSmith追踪:
- 启用追踪:设置
LANGCHAIN_TRACING_V2=true - 项目配置:为不同环境配置不同项目
- 采样率:生产环境配置合理的采样率(建议:10%)
- 敏感信息过滤:过滤追踪数据中的敏感信息
Prometheus指标:
- 请求指标:记录请求总数、成功率、失败率
- 延迟指标:记录P50、P95、P99延迟
- Token指标:记录输入/输出token使用量
- 错误指标:记录不同类型的错误数量
- 自定义指标:根据业务需求添加自定义指标
日志聚合:
- 结构化日志:使用JSON格式记录结构化日志
- 日志级别:生产环境使用INFO级别,开发环境使用DEBUG
- 日志聚合:配置ELK、Loki或CloudWatch日志聚合
- 日志保留:设置日志保留策略(建议:30-90天)
- 请求ID:为每个请求生成唯一ID用于追踪
告警规则:
- 延迟告警:P95延迟超过阈值时告警
- 错误率告警:错误率超过阈值时告警
- 成本告警:Token使用量超过预算时告警
- 可用性告警:服务不可用时立即告警
- 告警通道:配置PagerDuty、Slack、邮件等告警通道
5.4.5 成本检查清单#
Token使用监控:
- 实时监控:实时监控token使用量
- 用户级统计:按用户统计token使用量
- 模型级统计:按模型统计成本
- 成本分析:定期分析成本构成
成本预算控制:
- 预算告警:设置每日/每月预算告警
- 用户配额:为每个用户设置token配额
- 速率限制:通过速率限制控制成本
- 自动熔断:超出预算时自动停止服务
缓存命中率优化:
- 缓存命中率监控:监控缓存命中率(目标:>80%)
- 缓存预热:预加载常用查询
- 语义缓存:使用语义相似度缓存提升命中率
- 缓存分析:定期分析缓存效果
模型选择策略:
- 智能路由:根据任务复杂度选择合适模型
- 成本优化:优先使用便宜模型(如gpt-4o-mini)
- 质量平衡:在成本和质量间找到平衡点
- A/B测试:对比不同模型的成本效益
5.4.6 数据合规检查清单#
GDPR合规:
- 用户同意:收集用户数据处理同意
- 数据最小化:仅收集必要的个人数据
- 访问权:实现用户数据访问接口
- 删除权:实现用户数据删除功能(被遗忘权)
- 可携带权:支持导出用户数据
- 数据假名化:对个人数据进行假名化处理
数据加密:
- 传输加密:使用TLS加密所有网络传输
- 存储加密:加密存储敏感数据
- 密钥管理:使用KMS管理加密密钥
- 加密算法:使用AES-256等强加密算法
审计日志:
- 完整记录:记录所有数据访问和修改操作
- 不可篡改:使用只追加存储或区块链技术
- 日志签名:对审计日志进行数字签名
- 定期审计:定期审查审计日志
- 合规报告:生成合规审计报告
5.4.7 部署前最终检查#
代码检查:
- 单元测试:单元测试覆盖率>80%
- 集成测试:所有关键路径有集成测试
- 性能测试:通过负载测试和压力测试
- 安全扫描:通过安全漏洞扫描
配置检查:
- 环境变量:所有环境变量正确配置
- 资源限制:配置CPU、内存限制
- 副本数:配置合理的副本数量(建议:>=2)
- 自动扩缩容:配置HPA自动扩缩容策略
文档检查:
- 部署文档:完整的部署操作文档
- 运维手册:故障排查和应急响应手册
- API文档:完整的API接口文档
- 变更记录:记录所有重要变更
演练检查:
- 灰度发布:先在小范围用户测试
- 回滚计划:准备快速回滚方案
- 故障演练:进行故障注入和恢复演练
- 应急响应:明确应急响应流程和人员
本章小结#
- 架构设计:RAG、Multi-Agent、Workflow 三种核心架构模式
- 性能优化:延迟优化、吞吐量提升、Token 成本控制、多层缓存
- 安全防护:Guardrails 体系、PII 保护、分层防护
- 合规要求:输入输出安全、GDPR 合规、审计日志
- 部署运维:容器化、Serverless、监控告警、故障恢复
- 生产清单:全面的安全、性能、可靠性、监控、成本、合规检查清单
第6章:测试与质量保障#
关注点: 构建Agent的完整测试体系
6.1 测试金字塔#
Agent测试不同于传统软件测试,需要处理LLM的不确定性:
/\
/ \ E2E评估(少量、慢、贵)
/____\ - LangSmith Evaluation
/ \ - 真实场景回归
/ \
/ \ 集成测试(中等)
/____________\ - Agent工作流测试
/ \ - 工具调用链路测试
/ \
/__________________\ 单元测试(大量、快、便宜)
- Tool函数测试
- Prompt模板测试
- 输出解析器测试原则:70%单元测试 + 20%集成测试 + 10%E2E评估
6.2 单元测试实践#
6.2.1 Tool函数测试#
# test_tools.py
import pytest
from langchain_core.tools import tool
@tool
def extract_order_id(text: str) -> str:
"""从文本中提取订单号"""
import re
match = re.search(r'订单号[::]\s*(\w+)', text)
return match.group(1) if match else "未找到订单号"
class TestTools:
"""Tool函数单元测试"""
@pytest.mark.parametrize("text,expected", [
("您的订单号:ABC123已确认", "ABC123"),
("订单号:XYZ789", "XYZ789"),
("这是一段没有订单号的文本", "未找到订单号"),
])
def test_extract_order_id(self, text, expected):
"""参数化测试多种格式"""
result = extract_order_id.invoke({"text": text})
assert result == expected
def test_tool_metadata(self):
"""测试Tool元数据"""
assert extract_order_id.name == "extract_order_id"
assert "订单号" in extract_order_id.description运行:
pytest test_tools.py -v
# ✅ test_extract_order_id[...] PASSED
# ✅ test_tool_metadata PASSED6.2.2 Agent工作流集成测试#
# test_agent_workflow.py
import pytest
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from unittest.mock import Mock, patch
class TestAgentWorkflow:
"""Agent集成测试"""
@pytest.fixture
def agent(self):
"""创建测试用Agent"""
return create_agent(
ChatOpenAI(model="gpt-4o-mini", temperature=0),
tools=[search_tool, calculator_tool],
system_prompt="你是助手"
)
def test_tool_selection(self, agent):
"""测试工具选择正确性"""
result = agent.invoke({
"messages": [("user", "搜索Python")]
})
messages = result["messages"]
tool_calls = [m for m in messages if hasattr(m, 'name')]
# 验证调用了search_tool
assert any(m.name == "search_tool" for m in tool_calls)
@patch('langchain_openai.ChatOpenAI')
def test_with_mocked_llm(self, mock_llm_class):
"""Mock LLM避免真实API调用"""
mock_llm = Mock()
mock_llm_class.return_value = mock_llm
from langchain_core.messages import AIMessage
mock_llm.invoke.return_value = AIMessage(content="模拟回复")
agent = create_agent(mock_llm, tools=[], system_prompt="测试")
result = agent.invoke({"messages": [("user", "测试")]})
assert "模拟回复" in result["messages"][-1].content
assert mock_llm.invoke.called6.3 LangSmith自动化评估#
6.3.1 创建评估Dataset#
from langsmith import Client
client = Client()
# 创建Dataset
dataset = client.create_dataset(
dataset_name="customer_support_qa",
description="客服问答评估数据集"
)
# 添加测试用例
test_cases = [
{
"inputs": {"question": "如何退款?"},
"outputs": {
"expected_keywords": ["订单详情", "申请退款"],
"relevance_score": 5
}
},
# ... 更多用例
]
for case in test_cases:
client.create_example(
dataset_id=dataset.id,
inputs=case["inputs"],
outputs=case["outputs"]
)6.3.2 定义Evaluators#
from langsmith.evaluation import evaluator
@evaluator
def keyword_coverage_evaluator(run, example):
"""关键词覆盖率评估"""
answer = run.outputs.get("answer", "")
expected_keywords = example.outputs.get("expected_keywords", [])
found = [kw for kw in expected_keywords if kw in answer]
score = len(found) / len(expected_keywords) if expected_keywords else 0
return {
"key": "keyword_coverage",
"score": score,
"comment": f"覆盖 {len(found)}/{len(expected_keywords)} 个关键词"
}
# LLM-as-Judge评估器
from langsmith.evaluation import LangChainStringEvaluator
llm_evaluator = LangChainStringEvaluator(
"cot_qa",
prepare_data=lambda run, example: {
"prediction": run.outputs.get("answer", ""),
"reference": example.outputs.get("expected_answer", ""),
"input": example.inputs.get("question", "")
}
)6.3.3 运行评估与A/B测试#
from langsmith.evaluation import evaluate
# 版本A
def predict_v_a(inputs):
agent = create_agent(
ChatOpenAI(model="gpt-4o"),
tools=[],
system_prompt="你是客服助手"
)
result = agent.invoke({"messages": [("user", inputs["question"])]})
return {"answer": result["messages"][-1].content}
# 版本B(改进的Prompt)
def predict_v_b(inputs):
agent = create_agent(
ChatOpenAI(model="gpt-4o"),
tools=[],
system_prompt="""你是专业的客服助手。
回答要求:
1. 准确理解用户问题
2. 提供清晰的步骤说明
3. 使用友好的语气"""
)
result = agent.invoke({"messages": [("user", inputs["question"])]})
return {"answer": result["messages"][-1].content}
# A/B测试
results_a = evaluate(
predict_v_a,
data="customer_support_qa",
evaluators=[keyword_coverage_evaluator, llm_evaluator],
experiment_prefix="Version_A"
)
results_b = evaluate(
predict_v_b,
data="customer_support_qa",
evaluators=[keyword_coverage_evaluator, llm_evaluator],
experiment_prefix="Version_B"
)
# 对比
print(f"版本A关键词覆盖率: {results_a['keyword_coverage_avg']:.2%}")
print(f"版本B关键词覆盖率: {results_b['keyword_coverage_avg']:.2%}")
improvement = ((results_b['keyword_coverage_avg'] - results_a['keyword_coverage_avg']) /
results_a['keyword_coverage_avg'] * 100)
print(f"改进幅度: {improvement:+.1f}%")
# 统计显著性检验
from scipy import stats
scores_a = [r.evaluation_results["keyword_coverage"] for r in results_a.results]
scores_b = [r.evaluation_results["keyword_coverage"] for r in results_b.results]
t_stat, p_value = stats.ttest_ind(scores_a, scores_b)
print(f"显著性: {'✅ 显著' if p_value < 0.05 else '⚠️ 不显著'} (p={p_value:.4f})")6.4 CI/CD集成#
6.4.1 GitHub Actions配置#
# .github/workflows/test.yml
name: Test Suite
on:
push:
branches: [main, dev]
pull_request:
branches: [main]
jobs:
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run Unit Tests
run: |
pytest tests/unit -v --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
integration-tests:
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run Integration Tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
pytest tests/integration -v
langsmith-evaluation:
runs-on: ubuntu-latest
needs: integration-tests
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Run LangSmith Evaluation
env:
LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python scripts/run_evaluation.py
- name: Quality Gate
run: |
python scripts/quality_gate.py6.4.2 质量门禁#
# scripts/quality_gate.py
from langsmith.evaluation import evaluate
import sys
def run_quality_gate():
"""质量门禁检查"""
print("🚦 运行质量门禁检查...\n")
results = evaluate(
predict,
data="customer_support_qa",
evaluators=[keyword_coverage_evaluator]
)
# 质量标准
quality_standards = {
"keyword_coverage_avg": 0.80, # >= 80%
"pass_rate": 0.90 # >= 90%
}
all_passed = True
for metric, threshold in quality_standards.items():
actual = results.get(metric, 0)
passed = actual >= threshold
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{metric}: {actual:.2%} (阈值: {threshold:.2%}) - {status}")
if not passed:
all_passed = False
if all_passed:
print("\n✅ 质量门禁通过,允许部署")
return 0
else:
print("\n❌ 质量门禁失败,禁止部署")
return 1
if __name__ == "__main__":
sys.exit(run_quality_gate())6.5 测试最佳实践#
测试覆盖率目标:
| 测试类型 | 比例 | 速度 | 成本 | 何时运行 |
|---|---|---|---|---|
| 单元测试 | 70% | 快(秒级) | 低 | 每次commit |
| 集成测试 | 20% | 中(分钟级) | 中 | 每次PR |
| LangSmith评估 | 10% | 慢(小时级) | 高 | 发布前 |
关键原则:
- ✅ 单元测试覆盖所有Tool函数
- ✅ 集成测试覆盖关键工作流
- ✅ Mock外部依赖(LLM、API)降低成本
- ✅ LangSmith评估用于质量对比
- ✅ 质量门禁确保生产标准
- ✅ CI/CD自动化运行
第7章:错误处理与降级策略#
关注点: 生产环境的容错与高可用
7.1 错误处理层次#
用户请求
↓
┌─────────────────────────────┐
│ 1. 输入验证 │ ← 防止无效输入
└─────────┬───────────────────┘
↓
┌─────────────────────────────┐
│ 2. 重试机制 │ ← 处理临时故障
└─────────┬───────────────────┘
↓
┌─────────────────────────────┐
│ 3. 降级策略 │ ← 保证基本服务
└─────────┬───────────────────┘
↓
┌─────────────────────────────┐
│ 4. 兜底响应 │ ← 优雅失败
└─────────┬───────────────────┘
↓
返回结果7.2 输入验证#
from pydantic import BaseModel, Field, validator
from typing import Optional
class AgentRequest(BaseModel):
"""Agent请求验证"""
query: str = Field(..., min_length=1, max_length=1000)
user_id: str = Field(..., pattern=r'^[a-zA-Z0-9_-]+$')
context: Optional[dict] = None
@validator('query')
def validate_query(cls, v):
"""验证查询内容"""
# 检测恶意输入
malicious_patterns = ['<script>', 'DROP TABLE', 'rm -rf']
if any(pattern in v for pattern in malicious_patterns):
raise ValueError("检测到恶意输入")
# 检测敏感信息
import re
if re.search(r'\d{15,19}', v): # 信用卡号模式
raise ValueError("请勿输入敏感信息")
return v
# 使用
try:
request = AgentRequest(
query="帮我查询订单",
user_id="user_123"
)
result = agent.invoke({"messages": [("user", request.query)]})
except ValueError as e:
return {"error": str(e), "code": "INVALID_INPUT"}7.3 智能重试机制#
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from langchain_openai import ChatOpenAI
import logging
logger = logging.getLogger(__name__)
class RetryableAgent:
"""带重试的Agent"""
def __init__(self):
self.agent = create_agent(
ChatOpenAI(model="gpt-4o"),
tools=[],
system_prompt="你是助手"
)
@retry(
stop=stop_after_attempt(3), # 最多重试3次
wait=wait_exponential(multiplier=1, min=1, max=10), # 指数退避
retry=retry_if_exception_type((TimeoutError, ConnectionError)),
before_sleep=lambda retry_state: logger.warning(
f"重试 {retry_state.attempt_number}/3: {retry_state.outcome.exception()}"
)
)
def invoke_with_retry(self, inputs: dict) -> dict:
"""带重试的调用"""
try:
return self.agent.invoke(inputs)
except Exception as e:
logger.error(f"Agent调用失败: {e}")
raise
# 使用
agent = RetryableAgent()
try:
result = agent.invoke_with_retry({"messages": [("user", "测试")]})
except Exception:
# 重试3次后仍失败
result = {"error": "服务暂时不可用,请稍后重试"}7.4 多级降级策略#
from enum import Enum
from typing import Dict, Any
class FallbackLevel(Enum):
"""降级级别"""
PRIMARY = 1 # GPT-4o(最优)
SECONDARY = 2 # GPT-4o-mini(次优)
TERTIARY = 3 # GPT-3.5-turbo(保底)
CACHE = 4 # 缓存结果
STATIC = 5 # 静态回复
class FallbackAgent:
"""多级降级Agent"""
def __init__(self):
self.cache = {} # 简单缓存
self.fallback_responses = {
"default": "抱歉,服务暂时不可用,请稍后重试。",
"timeout": "请求超时,请稍后重试。",
"rate_limit": "请求过于频繁,请稍后重试。"
}
def invoke(self, inputs: dict) -> Dict[str, Any]:
"""多级降级调用"""
query = inputs["messages"][0][1]
# 级别1:GPT-4o
try:
return self._invoke_primary(inputs)
except Exception as e:
logger.warning(f"Primary失败: {e}")
# 级别2:GPT-4o-mini
try:
return self._invoke_secondary(inputs)
except Exception as e:
logger.warning(f"Secondary失败: {e}")
# 级别3:GPT-3.5-turbo
try:
return self._invoke_tertiary(inputs)
except Exception as e:
logger.warning(f"Tertiary失败: {e}")
# 级别4:缓存
cached = self.cache.get(query)
if cached:
logger.info("使用缓存结果")
return {"messages": [cached], "fallback_level": FallbackLevel.CACHE}
# 级别5:静态回复
logger.error("所有降级策略失败,使用静态回复")
return {
"messages": [("assistant", self.fallback_responses["default"])],
"fallback_level": FallbackLevel.STATIC
}
def _invoke_primary(self, inputs: dict) -> dict:
"""主要模型"""
agent = create_agent(ChatOpenAI(model="gpt-4o", timeout=5), tools=[], system_prompt="你是助手")
result = agent.invoke(inputs)
return {**result, "fallback_level": FallbackLevel.PRIMARY}
def _invoke_secondary(self, inputs: dict) -> dict:
"""次优模型"""
agent = create_agent(ChatOpenAI(model="gpt-4o-mini", timeout=3), tools=[], system_prompt="你是助手")
result = agent.invoke(inputs)
return {**result, "fallback_level": FallbackLevel.SECONDARY}
def _invoke_tertiary(self, inputs: dict) -> dict:
"""保底模型"""
agent = create_agent(ChatOpenAI(model="gpt-3.5-turbo", timeout=2), tools=[], system_prompt="你是助手")
result = agent.invoke(inputs)
return {**result, "fallback_level": FallbackLevel.TERTIARY}使用示例:
agent = FallbackAgent()
result = agent.invoke({"messages": [("user", "你好")]})
print(f"响应: {result['messages'][-1].content}")
print(f"降级级别: {result['fallback_level'].name}")
# 监控降级率
from collections import Counter
fallback_counter = Counter()
for _ in range(100):
result = agent.invoke({"messages": [("user", "测试")]})
fallback_counter[result['fallback_level'].name] += 1
print("\n降级统计:")
for level, count in fallback_counter.items():
print(f" {level}: {count}次 ({count/100:.1%})")
# 输出:
# PRIMARY: 85次 (85.0%)
# SECONDARY: 10次 (10.0%)
# TERTIARY: 3次 (3.0%)
# CACHE: 2次 (2.0%)7.5 熔断器模式#
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
"""熔断器状态"""
CLOSED = "closed" # 正常
OPEN = "open" # 熔断(拒绝请求)
HALF_OPEN = "half_open" # 半开(尝试恢复)
class CircuitBreaker:
"""熔断器"""
def __init__(
self,
failure_threshold: int = 5, # 失败阈值
timeout: int = 60, # 熔断超时(秒)
half_open_max_calls: int = 3 # 半开状态最大尝试次数
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.half_open_calls = 0
def call(self, func, *args, **kwargs):
"""执行调用"""
if self.state == CircuitState.OPEN:
# 检查是否可以转为半开状态
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
logger.info("熔断器转为半开状态")
else:
raise Exception("熔断器开启,拒绝请求")
try:
result = func(*args, **kwargs)
# 成功
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max_calls:
self.state = CircuitState.CLOSED
self.failure_count = 0
logger.info("熔断器恢复正常")
return result
except Exception as e:
# 失败
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
logger.error(f"熔断器开启(失败{self.failure_count}次)")
raise
# 使用
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=60)
def invoke_agent(inputs):
"""调用Agent(可能失败)"""
return circuit_breaker.call(
lambda: agent.invoke(inputs)
)
# 测试
for i in range(10):
try:
result = invoke_agent({"messages": [("user", "测试")]})
print(f"请求{i}: ✅ 成功")
except Exception as e:
print(f"请求{i}: ❌ {e}")7.6 监控与告警#
from dataclasses import dataclass
from datetime import datetime
from typing import List
import time
@dataclass
class ErrorMetrics:
"""错误指标"""
timestamp: datetime
error_type: str
error_message: str
fallback_level: Optional[str] = None
class ErrorMonitor:
"""错误监控"""
def __init__(self):
self.errors: List[ErrorMetrics] = []
def record_error(
self,
error_type: str,
error_message: str,
fallback_level: Optional[str] = None
):
"""记录错误"""
self.errors.append(ErrorMetrics(
timestamp=datetime.now(),
error_type=error_type,
error_message=error_message,
fallback_level=fallback_level
))
def get_error_rate(self, window_minutes: int = 5) -> float:
"""获取错误率"""
cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
recent_errors = [
e for e in self.errors
if e.timestamp > cutoff_time
]
# 假设总请求数
total_requests = 100
return len(recent_errors) / total_requests if total_requests else 0
def check_alerts(self) -> List[str]:
"""检查告警"""
alerts = []
# 错误率告警
error_rate = self.get_error_rate(window_minutes=5)
if error_rate > 0.1: # 10%
alerts.append(f"⚠️ 错误率过高: {error_rate:.1%}")
# 降级告警
recent_fallbacks = [
e for e in self.errors[-100:]
if e.fallback_level in ["TERTIARY", "CACHE", "STATIC"]
]
if len(recent_fallbacks) > 20:
alerts.append(f"⚠️ 频繁降级: {len(recent_fallbacks)}次")
return alerts7.7 错误处理最佳实践#
生产环境检查清单:
输入验证
- 长度限制
- 格式验证
- 恶意输入检测
- 敏感信息过滤
重试机制
- 指数退避
- 最大重试次数
- 可重试异常类型
降级策略
- 多级模型降级
- 缓存降级
- 静态响应兜底
熔断保护
- 失败阈值配置
- 熔断超时配置
- 半开状态尝试
监控告警
- 错误率监控
- 降级率监控
- 告警规则配置
关键指标:
| 指标 | 目标值 | 告警阈值 |
|---|---|---|
| 错误率 | < 1% | > 5% |
| 降级率 | < 5% | > 20% |
| 重试成功率 | > 80% | < 50% |
| 熔断触发次数 | 0 | > 3次/小时 |