第十篇 生产实践与监控评估#

目标: 构建生产级LLM应用的完整体系

从监控追踪到架构设计,从性能优化到安全防护,从部署运维到故障排查,全面掌握生产环境的关键要素。


第1章:LangSmith Tracing 与 Evaluation#

关注点:掌握 Agent 执行的全链路可观测性,建立科学的评估框架。

1.1 追踪体系#

1.1.1 追踪原理与数据模型#

什么是追踪(Tracing)?

追踪是记录和分析 Agent 执行过程的完整链路,从用户输入开始,记录每一个中间步骤(模型调用、工具执行、状态变化),最终得到输出。LangSmith 追踪形成一棵执行树:

root_run (Agent 执行)
├── before_model_hook (Middleware)
├── model_call (模型调用)
   ├── system_prompt
   ├── messages
   └── tools
├── tool_run (工具执行)
   ├── search_tool
   └── get_weather_tool
└── after_model_hook (后处理)

追踪的核心作用

  1. 调试:看到完整的执行链,快速定位问题
  2. 监控:追踪延迟、Token 成本、错误率等指标
  3. 优化:识别瓶颈,比较不同版本的性能差异
  4. 审计:记录谁做了什么,满足合规要求

数据模型

class Run:
    id: str                          # 唯一 ID
    name: str                        # 运行名称
    run_type: str                    # "agent", "model", "tool", "chain" 等
    parent_run_id: Optional[str]     # 父 Run ID(形成树关系)

    # 输入输出
    inputs: dict[str, Any]           # 输入参数
    outputs: dict[str, Any]          # 输出结果

    # 时间和成本
    start_time: datetime             # 开始时间
    end_time: datetime               # 结束时间
    duration: float                  # 执行耗时(秒)

    # Token 和成本
    token_usage: Optional[TokenUsage]
    cost: Optional[float]            # 美元成本

    # 状态和错误
    status: str                      # "success", "error"
    error: Optional[str]             # 错误信息

    # 元数据
    metadata: dict[str, Any]         # 自定义元数据
    tags: list[str]                  # 标签(用于筛选)

    # 反馈
    feedback_records: list[Feedback] # 用户反馈

1.1.2 自动追踪:环境变量配置#

最简单的开启方式

# 设置环境变量(LangSmith 1.0+ 最新命名)
export LANGSMITH_API_KEY="lsv2_..."
export LANGCHAIN_PROJECT="my_project"
export LANGSMITH_TRACING="true"

向后兼容说明:

  • 旧版环境变量 LANGCHAIN_API_KEYLANGCHAIN_TRACING_V2 仍然支持,但建议迁移到新命名
  • API Key 格式从 sk-... 变更为 lsv2_...(LangSmith v2)
  • LANGCHAIN_PROJECT 保持不变(生态通用变量)

Python 代码中配置

import os
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent

# 配置 LangSmith(最新命名)
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGCHAIN_PROJECT"] = "my_project"
os.environ["LANGSMITH_TRACING"] = "true"

# 创建 Agent
model = ChatOpenAI(model="gpt-4o")
agent = create_agent(
    model=model,
    tools=[search, weather],
    system_prompt="你是一个助手"
)

# ✅ 自动追踪:所有调用都会被记录到 LangSmith
result = agent.invoke({"messages": [("user", "今天天气如何")]})

# 查看追踪:
# 1. 打开 https://smith.langchain.com
# 2. 选择项目 "my_project"
# 3. 查看实时追踪树

环境变量选项

变量说明示例状态
LANGSMITH_API_KEYLangSmith API 密钥(必需)lsv2_...✅ 推荐
LANGCHAIN_PROJECT项目名称(用于分组)my_agent✅ 通用
LANGSMITH_TRACING启用追踪(必需)true✅ 推荐
LANGSMITH_WORKSPACE_ID工作区 ID(团队使用)ws-...✅ 新增
LANGCHAIN_ENDPOINTLangSmith API 端点https://api.smith.langchain.com✅ 可选
LANGCHAIN_SESSION会话名称(可选,用于子分组)session-123✅ 可选
LANGCHAIN_CALLBACKS_BACKGROUND后台异步发送追踪数据true✅ 可选
LANGCHAIN_TRACING_SAMPLING_RATE采样率(0.0-1.0,用于生产环境)0.1✅ 可选
LANGSMITH_TEST_CACHE缓存 API 调用(加速评估)true✅ 可选
LANGCHAIN_API_KEY(旧)API 密钥sk-...⚠️ 已弃用
LANGCHAIN_TRACING_V2(旧)启用追踪true⚠️ 已弃用

完整环境变量配置示例

import os

# 基础配置(必需)- LangSmith 1.0+ 最新命名
os.environ["LANGSMITH_API_KEY"] = "lsv2_..."
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGCHAIN_PROJECT"] = "my_project"

# 团队协作配置(可选)
os.environ["LANGSMITH_WORKSPACE_ID"] = "ws-..."  # 工作区 ID(团队共享)

# 可选配置
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"  # 自定义端点
os.environ["LANGCHAIN_SESSION"] = "user-session-123"  # 会话分组
os.environ["LANGCHAIN_CALLBACKS_BACKGROUND"] = "true"  # 后台发送,不阻塞主程序
os.environ["LANGCHAIN_TRACING_SAMPLING_RATE"] = "0.1"  # 生产环境采样 10%

# 开发/测试环境配置
os.environ["LANGSMITH_TEST_CACHE"] = "true"  # 缓存 API 调用以加速测试

# 向后兼容(不推荐,仍然支持)
# os.environ["LANGCHAIN_API_KEY"] = "sk-..."  # 已弃用,使用 LANGSMITH_API_KEY
# os.environ["LANGCHAIN_TRACING_V2"] = "true"  # 已弃用,使用 LANGSMITH_TRACING

环境变量说明

  1. 必需变量(LangSmith 1.0+):

  2. 项目组织

    • LANGCHAIN_PROJECT:项目名称,用于分组追踪(生态通用变量,保持不变)
    • LANGCHAIN_SESSION:会话名称,用于更细粒度的分组(如按用户或任务分组)
    • LANGSMITH_WORKSPACE_ID:工作区 ID,用于团队协作(格式为 ws-...
  3. 性能优化

    • LANGCHAIN_CALLBACKS_BACKGROUND:设为 "true" 后,追踪数据异步发送,不阻塞主程序
    • LANGCHAIN_TRACING_SAMPLING_RATE:生产环境建议设置采样率(如 "0.1" 表示 10%),降低追踪成本
  4. 测试加速

    • LANGSMITH_TEST_CACHE:在测试/评估时缓存 API 响应,避免重复调用
  5. 向后兼容(已弃用):

    • LANGCHAIN_API_KEY(旧格式 sk-...)→ 迁移到 LANGSMITH_API_KEY(新格式 lsv2_...
    • LANGCHAIN_TRACING_V2 → 迁移到 LANGSMITH_TRACING

1.1.3 手动追踪:@traceable 装饰器#

场景:追踪非 LangChain 代码(自定义函数、数据库操作等)。

基础用法

from langsmith.run_helpers import traceable

@traceable(name="my_function", run_type="chain")
def my_custom_function(input_text: str) -> str:
    """自定义函数会自动被追踪"""
    # 处理输入
    result = input_text.upper()
    return result

# 调用时自动记录到 LangSmith
output = my_custom_function("hello")

完整参数说明

from langsmith import traceable

@traceable(
    # 基础参数
    name="custom_processing",              # 运行名称(默认:函数名)
    run_type="tool",                       # 运行类型:llm/chain/tool/retriever/prompt(默认:chain)

    # 组织和标记
    metadata={                             # 元数据(任意键值对)
        "version": "1.0",
        "author": "alice",
        "module": "data_processing"
    },
    tags=["production", "v1"],             # 标签列表(用于筛选)
    project_name="my_project",             # 项目名称(覆盖环境变量)

    # 高级参数
    reduce_fn=None,                        # 聚合函数(用于生成器/流式输出)
    client=None,                           # 自定义 LangSmith Client
    process_inputs=None,                   # 输入预处理函数
    process_outputs=None,                  # 输出后处理函数
)
def process_data(data: dict) -> dict:
    """处理数据"""
    result = {**data, "processed": True}
    return result

参数详细说明

参数类型说明默认值
namestr追踪运行的显示名称函数名
run_typestr运行类型(llm/chain/tool/retriever/prompt"chain"
metadatadict自定义元数据(键值对)None
tagslist[str]标签列表,用于过滤和分组None
project_namestr项目名称(覆盖 LANGCHAIN_PROJECT 环境变量)None
reduce_fnCallable聚合函数,用于处理生成器/流式输出None
clientClient自定义 LangSmith Client 实例None
process_inputsCallable输入序列化函数(用于自定义输入格式)None
process_outputsCallable输出序列化函数(用于自定义输出格式)None

run_type 类型说明

  • "llm":LLM 模型调用
  • "chain":链式调用(默认)
  • "tool":工具执行
  • "retriever":检索操作
  • "prompt":提示词模板
  • 也可以使用自定义类型(如 "database", "api_call"

高级用法示例

# 1. 处理生成器输出
@traceable(
    name="stream_processor",
    reduce_fn=lambda outputs: "".join(outputs)  # 将流式输出合并为字符串
)
def stream_data(n: int):
    """生成器函数"""
    for i in range(n):
        yield f"chunk-{i}"

# 2. 自定义输入/输出处理
def serialize_inputs(inputs):
    """自定义输入序列化"""
    return {k: str(v)[:100] for k, v in inputs.items()}  # 截断长文本

@traceable(
    process_inputs=serialize_inputs,
    process_outputs=lambda o: {"result": str(o)[:200]}
)
def process_large_data(data: dict) -> dict:
    # 处理大量数据,但只记录摘要
    return {"result": data}

嵌套追踪(自动形成树关系):

@traceable(name="parent_task")
def parent_task(query: str) -> str:
    # 调用子任务
    result1 = search_task(query)
    result2 = analyze_task(result1)
    return result2

@traceable(name="search_task")
def search_task(query: str) -> str:
    # 子任务 1
    return f"搜索结果: {query}"

@traceable(name="analyze_task")
def analyze_task(text: str) -> str:
    # 子任务 2
    return f"分析: {text}"

# 调用时自动形成树:
# parent_task
# ├── search_task
# └── analyze_task
parent_task("LangChain")

访问当前运行信息

from langsmith.run_helpers import traceable, get_current_run_tree

@traceable
def my_function(x: int) -> int:
    # 获取当前运行
    run = get_current_run_tree()

    if run:
        print(f"当前运行 ID: {run.id}")
        print(f"运行名称: {run.name}")

        # 添加自定义元数据
        run.metadata = {"user_id": "user-123"}

    return x * 2

异步追踪示例

@traceable 装饰器完全支持异步函数,自动追踪异步操作:

import asyncio
from langsmith import traceable
import httpx

@traceable(name="fetch_data", run_type="retriever")
async def fetch_user_data(user_id: str) -> dict:
    """异步获取用户数据"""
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()

@traceable(name="process_user", run_type="chain")
async def process_user(user_id: str) -> dict:
    """异步处理用户信息"""
    # 异步调用会自动追踪
    user_data = await fetch_user_data(user_id)

    # 处理数据
    processed = {
        "id": user_data["id"],
        "name": user_data["name"].upper(),
        "processed_at": "2025-01-15"
    }

    return processed

# 使用异步追踪
async def main():
    result = await process_user("user-123")
    print(result)

# 执行
asyncio.run(main())

# 追踪树会显示:
# process_user (async)
# └── fetch_user_data (async)

异步批量操作追踪

@traceable(name="batch_fetch", run_type="chain")
async def fetch_multiple_users(user_ids: list[str]) -> list[dict]:
    """并发获取多个用户数据"""
    # 所有异步调用都会被追踪
    tasks = [fetch_user_data(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

# 使用
async def main():
    users = await fetch_multiple_users(["user-1", "user-2", "user-3"])

# 追踪树会显示:
# batch_fetch
# ├── fetch_user_data (user-1)
# ├── fetch_user_data (user-2)
# └── fetch_user_data (user-3)

错误处理与追踪

异常会自动记录到 LangSmith,无需额外配置:

@traceable(name="safe_operation", run_type="tool")
def safe_operation(value: int) -> int:
    """带错误处理的操作"""
    try:
        if value < 0:
            raise ValueError("Value must be non-negative")

        result = 100 / value
        return result

    except ValueError as e:
        # 异常会自动记录到 LangSmith
        print(f"Validation error: {e}")
        raise  # 重新抛出以标记为失败

    except ZeroDivisionError as e:
        # 除零错误也会被记录
        print(f"Division error: {e}")
        raise

# 调用失败的函数
try:
    safe_operation(0)  # 会在 LangSmith 中标记为 error
except ZeroDivisionError:
    pass

# LangSmith 中会显示:
# - 运行状态: error
# - 错误类型: ZeroDivisionError
# - 错误消息: division by zero
# - 完整堆栈跟踪

异步错误处理示例

@traceable(name="async_safe_fetch")
async def async_safe_fetch(url: str) -> dict:
    """带错误处理的异步请求"""
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=5.0)
            response.raise_for_status()
            return response.json()

    except httpx.TimeoutException as e:
        # 超时错误会被记录
        print(f"Request timeout: {e}")
        raise

    except httpx.HTTPStatusError as e:
        # HTTP 错误会被记录
        print(f"HTTP error: {e.response.status_code}")
        raise

    except Exception as e:
        # 所有其他错误也会被记录
        print(f"Unexpected error: {e}")
        raise

# 使用
async def main():
    try:
        data = await async_safe_fetch("https://invalid-url.example.com")
    except Exception:
        # LangSmith 会记录完整的错误信息
        pass

1.1.4 追踪信息分析#

在 LangSmith UI 中查看追踪

  1. Trace 树视图:查看完整的执行链路

    • 每个节点显示运行名称、状态、耗时
    • 点击节点查看详细信息(输入、输出、错误)
  2. 输入输出对比

    • 左侧:输入参数
    • 右侧:输出结果
    • 快速定位数据变化
  3. Token 成本分析

    • 显示每次模型调用的 Token 使用量
    • 计算累计成本(美元)
    • 识别成本最高的操作
  4. 延迟分析

    • 显示每个操作的耗时
    • 识别性能瓶颈
    • 比较不同版本的性能

编程方式分析追踪

from langsmith import Client

client = Client()

# 获取最近的追踪
runs = client.list_runs(
    project_name="my_project",
    limit=10  # 获取最近 10 条
)

for run in runs:
    print(f"运行: {run.name}")
    print(f"状态: {run.status}")
    print(f"耗时: {run.end_time - run.start_time}")

    if run.token_usage:
        print(f"Tokens: {run.token_usage.completion_tokens}")
        print(f"成本: ${run.cost}")

    # 获取子运行
    if run.child_runs:
        for child in run.child_runs:
            print(f"  └─ {child.name}: {child.status}")

筛选和统计

from datetime import datetime, timedelta

# 筛选条件
yesterday = datetime.now() - timedelta(days=1)

# 获取特定标签的运行
production_runs = client.list_runs(
    project_name="my_project",
    filter='tags:production',
    start_time=yesterday
)

# 统计
total_cost = sum(run.cost for run in production_runs if run.cost)
avg_duration = sum(
    (run.end_time - run.start_time).total_seconds()
    for run in production_runs
) / len(production_runs)

error_count = sum(1 for run in production_runs if run.status == "error")

print(f"总成本: ${total_cost:.2f}")
print(f"平均耗时: {avg_duration:.2f} 秒")
print(f"错误数: {error_count}")

反馈收集

# 在应用中收集用户反馈
run_id = "run-123"  # 从追踪中获取

# 用户点赞
client.create_feedback(
    run_id=run_id,
    key="user_rating",
    score=1.0,  # 1.0 = 好, 0.0 = 不好
    comment="很有用!"
)

# 专家标注
client.create_feedback(
    run_id=run_id,
    key="expert_evaluation",
    score=0.8,
    comment="大部分正确,但有一处错误"
)

# 查看反馈
feedbacks = client.list_feedbacks(run_id=run_id)
for feedback in feedbacks:
    print(f"{feedback.key}: {feedback.score} - {feedback.comment}")

1.2 Evaluation 评估框架#

1.2.1 Dataset 管理与测试集设计#

什么是 Dataset?

Dataset 是评估所需的测试数据集合,每个 Example 包含:

  • input:输入参数
  • output(可选):参考输出(用于对比)
  • metadata(可选):补充信息

创建 Dataset

from langsmith import Client

client = Client()

# 方法 1:手动创建
dataset = client.create_dataset(
    dataset_name="qa_benchmark",
    description="QA 任务基准数据集"
)

# 添加示例
client.create_example(
    dataset_id=dataset.id,
    inputs={"question": "LangChain 是什么?"},
    outputs={"answer": "LangChain 是一个构建 LLM 应用的框架"}
)

client.create_example(
    dataset_id=dataset.id,
    inputs={"question": "Python 3.13 的主要特性?"},
    outputs={"answer": "Python 3.13 引入了 JIT 编译器..."}
)

# 方法 2:从 CSV 导入
import pandas as pd

df = pd.read_csv("qa_examples.csv")
# 假设 CSV 有 "question" 和 "answer" 列

dataset = client.create_dataset(
    dataset_name="qa_from_csv",
    description="从 CSV 导入的 QA 数据集"
)

for _, row in df.iterrows():
    client.create_example(
        dataset_id=dataset.id,
        inputs={"question": row["question"]},
        outputs={"answer": row["answer"]}
    )

# 方法 3:从生产追踪创建(最推荐)
# 在生产环境运行一段时间后,筛选高质量的追踪
client.create_dataset(
    dataset_name="production_examples",
    description="从生产环境采集的真实用户查询"
)

# 手动筛选好的追踪并添加
# (通常通过 LangSmith UI 完成,支持批量导入)

测试集设计最佳实践

1. 覆盖代表场景

# ✅ 好:覆盖多种场景
examples = [
    # 简单查询
    {"question": "天气如何?", "answer": "..."},

    # 复杂查询
    {"question": "比较 LangChain 和 Langraph 的优缺点", "answer": "..."},

    # 边界情况
    {"question": "", "answer": "请输入有效问题"},
    {"question": "???.|||", "answer": "请输入有效问题"},

    # 多轮对话
    {"question": "什么是 RAG?", "answer": "..."},
    {"question": "能给个例子吗?", "answer": "..."},
]

2. 足够的数据量

  • 初期:10-20 个高质量样例(手工精选)
  • 中期:50-100 个样例(包括覆盖各种场景)
  • 成熟期:1000+ 个样例(从生产环境采集)

3. 包含元数据

client.create_example(
    dataset_id=dataset.id,
    inputs={"question": "LangChain 是什么?"},
    outputs={"answer": "LangChain 是..."},
    metadata={
        "difficulty": "easy",
        "category": "framework",
        "source": "user_feedback",
        "language": "chinese"
    }
)

1.2.2 评估指标:准确率、相关性、一致性、延迟、成本#

评估器返回格式详解

评估器必须返回一个字典,包含以下字段:

字段必需类型说明
keystr评估指标的名称(如 "accuracy", "relevance")
scorefloat/int/bool评估分数(通常 0-1,也可以是任意数值或布尔值)
valueAny实际值(通常与 score 相同,用于记录原始值)
commentstr评论或解释(用于调试和分析)
correctiondict修正建议(用于标注正确答案)

完整返回格式示例

# 最简格式(只包含必需字段)
return {
    "key": "accuracy",
    "score": 1.0
}

# 完整格式(包含所有字段)
return {
    "key": "accuracy",           # 指标名称
    "score": 0.85,                # 分数(0-1 或任意数值)
    "value": 0.85,                # 实际值(可选,通常与 score 相同)
    "comment": "Good answer!",    # 评论(可选,用于解释)
    "correction": {               # 修正建议(可选)
        "expected": "Paris",
        "actual": "paris"
    }
}

# 布尔分数
return {
    "key": "is_correct",
    "score": True  # 布尔值也可以
}

# 多个评估结果(返回列表)
return [
    {"key": "accuracy", "score": 0.9},
    {"key": "latency", "score": 1.5, "comment": "1.5 seconds"}
]

评估器类型

  1. 启发式评估器(Heuristic Evaluator):确定性规则
def is_valid_code(run, example):
    """检查输出是否是有效的 Python 代码"""
    code = run.outputs.get("code", "")

    try:
        compile(code, "<string>", "exec")
        return {
            "key": "valid_code",
            "score": 1.0,
            "comment": "Valid Python code"
        }
    except SyntaxError as e:
        return {
            "key": "valid_code",
            "score": 0.0,
            "comment": f"Syntax error: {str(e)}",
            "correction": {"error": str(e)}
        }

def exact_match(run, example):
    """精确匹配评估器"""
    actual = run.outputs.get("answer")
    expected = example.outputs.get("answer")
    is_match = actual == expected

    return {
        "key": "exact_match",
        "score": 1.0 if is_match else 0.0,
        "value": is_match,
        "comment": "Match!" if is_match else f"Expected: {expected}, Got: {actual}",
        "correction": {"expected": expected} if not is_match else None
    }

def length_check(run, example):
    """检查输出长度"""
    answer = run.outputs.get("answer", "")
    length = len(answer)

    return {
        "key": "answer_length",
        "score": length,  # 分数可以是任意数值
        "value": length,
        "comment": f"Answer has {length} characters"
    }

2. LLM 作为评估器(LLM-as-Judge):使用 LLM 打分

from langchain_openai import ChatOpenAI
from langsmith.evaluation import LangChainStringEvaluator

# 使用官方提供的 LLM 评估器
from langsmith.evaluation import (
    EvaluateStrings,
    evaluate_strings
)

# 自定义 LLM 评估器
def llm_evaluator(run, example):
    """使用 LLM 评估相关性"""
    model = ChatOpenAI(model="gpt-4o-mini")

    evaluation_prompt = f"""
    问题:{example.inputs.get("question")}
    参考答案:{example.outputs.get("answer")}
    实际答案:{run.outputs.get("answer")}

    请评估实际答案与参考答案的相似度(0-1)。
    """

    response = model.invoke(evaluation_prompt)

    # 解析响应
    score = float(response.content.split("\n")[0])

    return {
        "score": score,
        "key": "relevance",
        "comment": response.content
    }

3. 人工评估:通过 UI 手动标注

# LangSmith UI 中可以:
# 1. 在 Annotation Queue 中标注
# 2. 为每条结果打分或写评论
# 3. 导出评估结果供统计

常见评估指标

指标实现方式适用场景
Accuracy精确匹配或 LLM分类任务
BLEU/ROUGE文本相似度文本生成
Precision/Recall集合操作检索任务
RelevanceLLM 评估QA 任务
Consistency多次运行对比非确定性任务
Latency时间戳计算性能分析
CostToken 使用量成本控制

1.2.3 evaluate() 批量测试#

注意:LangSmith 1.0+ 中 run_on_dataset() 已重命名为 evaluate(),推荐使用新 API

evaluate() 函数的 data 参数支持的类型

evaluate() 函数的 data 参数非常灵活,支持多种数据源:

类型说明示例
str (数据集名称)LangSmith 平台上的数据集名称"qa_benchmark"
str (UUID)LangSmith 数据集的 UUID"a3d2f1b8-..."
list[dict]直接传入示例列表[{"inputs": {...}, "outputs": {...}}, ...]
Iterator[dict]示例迭代器(用于大数据集)iter([{...}, {...}])
Iterator[Example]LangSmith Example 对象迭代器client.list_examples(dataset_name="...")

使用不同类型的示例

from langsmith.evaluation import evaluate
from langsmith import Client

client = Client()

# 1. 使用数据集名称(最常用)
results = evaluate(
    predict,
    data="qa_benchmark",  # 数据集名称
    evaluators=[accuracy]
)

# 2. 使用数据集 UUID
results = evaluate(
    predict,
    data="a3d2f1b8-1234-5678-90ab-cdef12345678",  # UUID
    evaluators=[accuracy]
)

# 3. 直接传入示例列表(快速测试)
examples = [
    {
        "inputs": {"question": "What is LangChain?"},
        "outputs": {"answer": "LangChain is a framework..."}
    },
    {
        "inputs": {"question": "What is Python?"},
        "outputs": {"answer": "Python is a programming language..."}
    }
]
results = evaluate(
    predict,
    data=examples,  # 列表
    evaluators=[accuracy]
)

# 4. 使用迭代器(大数据集,节省内存)
def example_generator():
    """生成器函数,逐个产生示例"""
    for i in range(1000):
        yield {
            "inputs": {"question": f"Question {i}"},
            "outputs": {"answer": f"Answer {i}"}
        }

results = evaluate(
    predict,
    data=example_generator(),  # 迭代器
    evaluators=[accuracy]
)

# 5. 使用 LangSmith Example 对象迭代器
examples_iter = client.list_examples(
    dataset_name="qa_benchmark",
    limit=100  # 只测试前 100 个
)
results = evaluate(
    predict,
    data=examples_iter,
    evaluators=[accuracy]
)

基本用法

from langsmith.evaluation import evaluate
from langsmith import Client

client = Client()

# 定义预测函数
def predict(input_dict):
    """应用的预测函数"""
    question = input_dict.get("question")

    # 调用 Agent
    agent = create_agent(model=ChatOpenAI(model="gpt-4o-mini"), tools=[...])
    result = agent.invoke({"messages": [("user", question)]})

    return {"answer": result["messages"][-1].content}

# 定义评估器
def exact_match(run, example):
    """精确匹配评估"""
    return {
        "score": 1.0 if run.outputs["answer"] == example.outputs["answer"] else 0.0,
        "key": "exact_match"
    }

def relevance(run, example):
    """相关性评估"""
    model = ChatOpenAI(model="gpt-4o-mini")
    response = model.invoke(
        f"问题: {example.inputs['question']}\n答案: {run.outputs['answer']}\n"
        f"评估相关性(0-1):"
    )
    score = float(response.content)
    return {"score": score, "key": "relevance"}

# 运行评估
experiment_results = evaluate(
    predict,                                    # 预测函数
    data="qa_benchmark",                       # 数据集名称
    evaluators=[exact_match, relevance],       # 行级评估器
    experiment_prefix="Model v1",              # 实验名称前缀
    description="测试新模型版本",
    num_repetitions=2,                         # 运行 2 次(处理 LLM 非确定性)
    max_concurrency=10                         # 并发数
)

# 查看结果
print(f"实验名: {experiment_results.experiment_name}")
print(f"总样例数: {len(experiment_results.results)}")

# 统计指标
scores = [r.evaluation_results[0].score for r in experiment_results.results]
print(f"平均准确率: {sum(scores) / len(scores):.2%}")

高级用法:Summary Evaluator

from langsmith.evaluation import evaluate

def accuracy_summary(runs, examples):
    """实验级评估器(在所有运行上计算)"""
    correct = 0
    for run, example in zip(runs, examples):
        if run.outputs["answer"] == example.outputs["answer"]:
            correct += 1

    accuracy = correct / len(runs)
    return {"score": accuracy, "key": "accuracy"}

def precision_recall_summary(runs, examples):
    """计算 Precision 和 Recall"""
    predictions = [run.outputs["answer"] for run in runs]
    references = [example.outputs["answer"] for example in examples]

    # 假设输出是分类标签列表
    tp = sum(1 for p, r in zip(predictions, references) if p == r and p == "positive")
    fp = sum(1 for p, r in zip(predictions, references) if p != r and p == "positive")
    fn = sum(1 for p, r in zip(predictions, references) if p != r and r == "positive")

    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0

    return [
        {"score": precision, "key": "precision"},
        {"score": recall, "key": "recall"}
    ]

# 使用 Summary Evaluator
evaluate(
    predict,
    data="classification_dataset",
    evaluators=[exact_match],
    summary_evaluators=[accuracy_summary, precision_recall_summary],
    experiment_prefix="Classification v1"
)

异步评估(处理大规模数据集)

import asyncio
from langsmith.evaluation import aevaluate

async def main():
    # 异步预测函数
    async def async_predict(input_dict):
        # 异步 Agent 调用
        result = await async_agent.ainvoke({"messages": [("user", input_dict["question"])]})
        return {"answer": result["messages"][-1].content}

    # 运行异步评估
    results = await aevaluate(
        async_predict,
        data="large_dataset",
        evaluators=[exact_match, relevance],
        experiment_prefix="Async Model v1",
        max_concurrency=50  # 并发 50 个请求
    )

    return results

# 执行
results = asyncio.run(main())

1.2.4 人工标注与反馈收集#

通过 LangSmith UI 标注

  1. 创建 Annotation Queue

    • 在项目中点击 “Create Annotation Queue”
    • 选择数据集和要标注的字段
  2. 标注流程

    • 逐条查看例子
    • 给每条结果评分或写评论
    • 导出标注结果

编程方式收集反馈

from langsmith import Client

client = Client()

# 应用中收集用户反馈
run_id = "run-123"

# 用户点赞/点踩
client.create_feedback(
    run_id=run_id,
    key="user_rating",
    score=1.0,  # 1.0 = 好, 0.0 = 不好
    comment="回答很准确!"
)

# 专家标注
client.create_feedback(
    run_id=run_id,
    key="expert_annotation",
    score=0.8,
    comment="大部分正确,但第二点不够准确"
)

# 导出所有反馈用于分析
feedbacks = client.list_feedbacks(
    run_ids=[run_id]
)

for feedback in feedbacks:
    print(f"{feedback.key}: {feedback.score}")
    print(f"  {feedback.comment}")

1.3 持续优化#

1.3.1 Prompt Hub 版本控制#

场景:系统提示词需要版本管理和对比。

使用 LangSmith Prompt Hub

from langsmith import Client
from langchain_core.prompts import ChatPromptTemplate

client = Client()

# 方法 1:将提示词推送到 Hub
system_prompt_v1 = """你是一个专业的编程助手。

能力:
1. 回答编程问题
2. 生成代码
3. 调试错误

约束:
- 只讨论编程相关话题
- 生成的代码要有注释
"""

# 创建提示词模板
prompt_template = ChatPromptTemplate.from_messages([
    ("system", system_prompt_v1),
    ("user", "{input}")
])

# 推送到 Hub(需要 API 权限)
client.push_prompt(
    prompt_identifier="programming_assistant_system_prompt",
    object=prompt_template,
    description="编程助手的系统提示词"
)

# 方法 2:从 Hub 拉取提示词
system_prompt = client.pull_prompt("programming_assistant_system_prompt")

# 方法 3:版本管理
# 每次推送自动创建新版本,可以:
# 1. 查看历史版本
# 2. 对比版本差异
# 3. 拉取特定版本

updated_prompt_v2 = """你是一个专业的编程助手。

增强能力:
1. 回答编程问题
2. 生成代码
3. 调试错误

约束:
- 只讨论编程相关话题
- 生成的代码要有注释
"""

# 推送新版本(自动创建版本 2)
updated_template = ChatPromptTemplate.from_messages([
    ("system", updated_prompt_v2),
    ("user", "{input}")
])

client.push_prompt(
    prompt_identifier="programming_assistant_system_prompt",
    object=updated_template,
    description="添加 '增强' 前缀以强调能力"
)

版本对比

# 在 LangSmith UI 中:
# 1. 打开 Prompt Hub
# 2. 选择提示词
# 3. 点击 "Compare Versions"
# 4. 查看变化和对比结果

1.3.2 A/B 测试#

完整的 A/B 测试流程

from langsmith.evaluation import evaluate
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent

# 版本 A:原始模型和提示词
def predict_v_a(input_dict):
    agent = create_agent(
        model=ChatOpenAI(model="gpt-4o-mini"),
        tools=[...],
        system_prompt="你是一个助手"
    )
    result = agent.invoke({"messages": [("user", input_dict["question"])]})
    return {"answer": result["messages"][-1].content}

# 版本 B:改进的提示词
def predict_v_b(input_dict):
    agent = create_agent(
        model=ChatOpenAI(model="gpt-4o-mini"),
        tools=[...],
        system_prompt="""你是一个专业的助手。

在回答前:
1. 理解问题的关键信息
2. 分解复杂问题为子问题
3. 使用工具获取必要信息

最后提供清晰、准确的答案。"""
    )
    result = agent.invoke({"messages": [("user", input_dict["question"])]})
    return {"answer": result["messages"][-1].content}

# 定义评估器
def exact_match(run, example):
    return {
        "score": 1.0 if run.outputs["answer"] == example.outputs["answer"] else 0.0,
        "key": "exact_match"
    }

# 运行版本 A
experiment_a = evaluate(
    predict_v_a,
    data="qa_benchmark",
    evaluators=[exact_match],
    experiment_prefix="Version A",
    description="基准版本"
)

# 运行版本 B
experiment_b = evaluate(
    predict_v_b,
    data="qa_benchmark",
    evaluators=[exact_match],
    experiment_prefix="Version B",
    description="改进的提示词"
)

# 对比结果
print("=== 版本对比 ===")
print(f"版本 A 准确率: {calculate_accuracy(experiment_a):.2%}")
print(f"版本 B 准确率: {calculate_accuracy(experiment_b):.2%}")

def calculate_accuracy(experiment):
    scores = [r.evaluation_results[0].score for r in experiment.results]
    return sum(scores) / len(scores)

统计显著性检验

from scipy import stats

# 获取两个版本的评分
scores_a = [r.evaluation_results[0].score for r in experiment_a.results]
scores_b = [r.evaluation_results[0].score for r in experiment_b.results]

# T 检验
t_stat, p_value = stats.ttest_ind(scores_a, scores_b)

print(f"T 统计量: {t_stat:.4f}")
print(f"P 值: {p_value:.4f}")

if p_value < 0.05:
    if sum(scores_b) > sum(scores_a):
        print("✅ 版本 B 显著更好(p < 0.05)")
    else:
        print("✅ 版本 A 显著更好(p < 0.05)")
else:
    print("⚠️  差异不显著(p >= 0.05)")

1.3.3 监控指标与问题发现#

关键监控指标

from langsmith import Client
from datetime import datetime, timedelta

client = Client()

# 监控指标收集
def monitor_agent_health():
    """Agent 健康监测"""

    # 获取最近 24 小时的运行
    yesterday = datetime.now() - timedelta(days=1)
    runs = client.list_runs(
        project_name="my_agent",
        start_time=yesterday
    )

    # 计算关键指标
    total_runs = len(runs)
    successful_runs = sum(1 for run in runs if run.status == "success")
    error_runs = sum(1 for run in runs if run.status == "error")

    avg_duration = sum(
        (run.end_time - run.start_time).total_seconds()
        for run in runs
    ) / total_runs

    total_cost = sum(run.cost for run in runs if run.cost)

    # 计算错误率
    error_rate = error_runs / total_runs if total_runs > 0 else 0

    # 计算成本效率
    cost_per_success = total_cost / successful_runs if successful_runs > 0 else 0

    # 返回指标
    return {
        "total_runs": total_runs,
        "success_rate": successful_runs / total_runs,
        "error_rate": error_rate,
        "avg_duration": avg_duration,
        "total_cost": total_cost,
        "cost_per_success": cost_per_success
    }

metrics = monitor_agent_health()

print("=== Agent 健康监测 ===")
print(f"总请求: {metrics['total_runs']}")
print(f"成功率: {metrics['success_rate']:.2%}")
print(f"错误率: {metrics['error_rate']:.2%}")
print(f"平均耗时: {metrics['avg_duration']:.2f}s")
print(f"总成本: ${metrics['total_cost']:.2f}")
print(f"成本/成功: ${metrics['cost_per_success']:.4f}")

问题自动告警

def check_health_alerts():
    """检查是否有告警条件"""
    metrics = monitor_agent_health()

    alerts = []

    # 错误率过高
    if metrics["error_rate"] > 0.1:  # > 10%
        alerts.append(f"⚠️  错误率过高: {metrics['error_rate']:.2%}")

    # 响应时间过长
    if metrics["avg_duration"] > 30:  # > 30秒
        alerts.append(f"⚠️  平均响应时间过长: {metrics['avg_duration']:.2f}s")

    # 成本突增
    if metrics["cost_per_success"] > 0.01:  # > $0.01 per success
        alerts.append(f"⚠️  成本过高: ${metrics['cost_per_success']:.4f}")

    if alerts:
        print("=== 告警 ===")
        for alert in alerts:
            print(alert)
    else:
        print("✅ 系统健康")

反馈驱动的问题发现

# 查找用户标记为"不好"的运行
bad_runs = client.list_runs(
    project_name="my_agent",
    filter='feedback_key:"user_rating" AND feedback_score:0'
)

print(f"找到 {len(bad_runs)} 条差评运行")

# 分析失败原因
failure_patterns = {}

for run in bad_runs:
    # 获取失败的工具调用
    if run.child_runs:
        for child in run.child_runs:
            if child.status == "error":
                tool_name = child.name
                if tool_name not in failure_patterns:
                    failure_patterns[tool_name] = 0
                failure_patterns[tool_name] += 1

# 排序并显示
for tool, count in sorted(failure_patterns.items(), key=lambda x: x[1], reverse=True):
    print(f"{tool}: {count} 次失败")

1.3.4 优化迭代流程#

完整的优化循环

1. 基准测试 → 获得当前性能基线
2. 假设提出 → 例如"改进提示词可以提高准确率 5%"
3. 制定对策 → 编写新的提示词或改进模型选择
4. 运行 A/B 测试 → 对比新旧版本
5. 分析结果 → 是否满足假设?
   ├─ 是 → 更新版本到生产
   └─ 否 → 返回步骤 2,尝试新假设
6. 监控上线 → 持续监控生产指标
7. 收集反馈 → 用户标注、问题报告
8. 返回步骤 1 → 开始新一轮优化

实现优化循环的代码框架

from langsmith import Client
from langsmith.evaluation import evaluate
from datetime import datetime

class ContinuousOptimization:
    def __init__(self, project_name: str):
        self.client = Client()
        self.project_name = project_name
        self.iteration = 0

    def baseline(self):
        """获得基准测试结果"""
        results = evaluate(
            predict,
            data="qa_benchmark",
            evaluators=[exact_match, relevance],
            experiment_prefix=f"Baseline_{datetime.now().isoformat()}",
            description="基准版本"
        )

        accuracy = self.calculate_accuracy(results)
        print(f"📊 基线准确率: {accuracy:.2%}")

        return results

    def propose_hypothesis(self, hypothesis: str):
        """提出假设"""
        self.hypothesis = hypothesis
        print(f"💡 假设: {hypothesis}")

    def implement_changes(self, changes: dict):
        """实现改进"""
        self.changes = changes
        print(f"⚙️  改进: {changes}")

    def run_experiment(self):
        """运行实验"""
        self.iteration += 1

        # 这里调用改进后的预测函数
        results = evaluate(
            predict_improved,  # 改进后的版本
            data="qa_benchmark",
            evaluators=[exact_match, relevance],
            experiment_prefix=f"Iteration_{self.iteration}_{datetime.now().isoformat()}",
            description=f"改进: {self.hypothesis}"
        )

        new_accuracy = self.calculate_accuracy(results)
        return new_accuracy

    def compare_with_baseline(self, baseline_acc, new_acc):
        """对比改进效果"""
        improvement = (new_acc - baseline_acc) / baseline_acc * 100

        if improvement > 0:
            print(f"✅ 改进成功! 提升 {improvement:.2%}")
            return True
        else:
            print(f"❌ 未达到预期,下降 {abs(improvement):.2%}")
            return False

    def deploy_to_production(self):
        """部署到生产环境"""
        print("🚀 部署新版本到生产")
        # 更新生产 Agent 的配置

    def calculate_accuracy(self, results):
        scores = [r.evaluation_results[0].score for r in results.results]
        return sum(scores) / len(scores)

# 使用示例
optimization = ContinuousOptimization("my_agent")

# 第 1 轮:基准
baseline_results = optimization.baseline()  # 准确率: 75%

# 第 2 轮:改进提示词
optimization.propose_hypothesis("详细的系统提示词能提高准确率")
optimization.implement_changes({"system_prompt": "新提示词..."})
acc_v2 = optimization.run_experiment()  # 准确率: 78%

if optimization.compare_with_baseline(0.75, acc_v2):
    optimization.deploy_to_production()

# 第 3 轮:切换模型
optimization.propose_hypothesis("使用更强的模型能进一步提高准确率")
optimization.implement_changes({"model": "gpt-4o"})
acc_v3 = optimization.run_experiment()  # 准确率: 82%

if optimization.compare_with_baseline(0.75, acc_v3):
    optimization.deploy_to_production()

本章小结#

  1. 追踪体系:自动和手动追踪,形成完整的执行树,用于调试和分析
  2. Evaluation 框架:Dataset + Evaluator 的组合,支持启发式、LLM、人工评估
  3. 批量测试evaluate()aevaluate() 函数,支持行级和实验级评估
  4. 持续优化:A/B 测试、版本控制、监控告警、迭代流程

思考与练习#

  1. 思考:为什么需要 num_repetitions > 1evaluate() 中?

    答案

    LLM 的输出有随机性(由 temperature 参数控制)。运行多次可以:

    1. 评估模型的稳定性
    2. 获得更准确的平均指标
    3. 检测随机导致的偶然好/坏结果
  2. 练习:实现一个 f1_score_summary() 评估器,计算 F1 分数。

  3. 思考:如何设计一个反馈循环,自动识别失败模式并提出改进建议?


总结与展望#

通过 LangSmith 的追踪和评估体系,我们已经掌握了:

  • 可观测性:完整的执行链路可见
  • 可测试性:科学的评估框架
  • 可优化性:数据驱动的迭代

这些能力为后续的高级应用(多 Agent、MCP 集成)和生产实践提供了坚实的基础。


参考资源


第2章: 架构设计模式#

关注点:掌握生产级 LangChain 应用的架构设计。

2.1 RAG 架构设计#

生产级 RAG 系统架构

┌─────────────────────────────────────────────────────────────┐
│                         用户接口层                             │
│                    (Web/API/Chat Interface)                   │
└─────────────┬───────────────────────────────────────────────┘
┌─────────────▼───────────────────────────────────────────────┐
│                        应用服务层                              │
│   ┌─────────────┐  ┌──────────────┐  ┌─────────────┐       │
│   │ 查询处理器   │  │  Agent 引擎   │  │ 结果后处理   │       │
│   └─────────────┘  └──────────────┘  └─────────────┘       │
└─────────────┬───────────────────────────────────────────────┘
┌─────────────▼───────────────────────────────────────────────┐
│                         核心层                                │
│   ┌────────────┐  ┌────────────┐  ┌─────────────┐          │
│   │ 向量检索器  │  │ 重排序器    │  │  LLM 网关    │          │
│   └────────────┘  └────────────┘  └─────────────┘          │
└─────────────┬───────────────────────────────────────────────┘
┌─────────────▼───────────────────────────────────────────────┐
│                        数据层                                 │
│   ┌────────────┐  ┌────────────┐  ┌─────────────┐          │
│   │ 向量数据库  │  │ 文档存储    │  │  缓存层     │          │
│   └────────────┘  └────────────┘  └─────────────┘          │
└─────────────────────────────────────────────────────────────┘

实现代码

from typing import List, Optional, Dict, Any
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Pinecone
from langchain_community.retrievers import ContextualCompressionRetriever
from langchain_community.retrievers.document_compressors import CohereRerank
from langchain_core.documents import Document
import redis
import hashlib
import json

class ProductionRAGSystem:
    """生产级 RAG 系统"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
        self.vector_store = self._init_vector_store()
        self.cache = self._init_cache()
        self.llm = self._init_llm()
        self.retriever = self._init_retriever()

    def _init_vector_store(self):
        """初始化向量数据库"""
        return Pinecone.from_existing_index(
            index_name=self.config["pinecone_index"],
            embedding=self.embeddings,
            namespace=self.config.get("namespace", "default")
        )

    def _init_cache(self):
        """初始化缓存层"""
        return redis.Redis(
            host=self.config["redis_host"],
            port=self.config["redis_port"],
            db=0,
            decode_responses=True
        )

    def _init_llm(self):
        """初始化 LLM 网关"""
        return ChatOpenAI(
            model=self.config["llm_model"],
            temperature=0.7,
            max_retries=3,
            request_timeout=30
        )

    def _init_retriever(self):
        """初始化检索器(带重排序)"""
        base_retriever = self.vector_store.as_retriever(
            search_kwargs={"k": self.config.get("retrieve_k", 10)}
        )

        # 添加压缩/重排序(使用 CohereRerank)
        compressor = CohereRerank(
            model="rerank-multilingual-v3.0",
            top_n=self.config.get("top_k", 5)
        )

        return ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )

    def _get_cache_key(self, query: str) -> str:
        """生成缓存键"""
        return f"rag:{hashlib.md5(query.encode()).hexdigest()}"

    def retrieve(self, query: str) -> List[Document]:
        """检索相关文档"""
        # 查询改写
        rewritten_query = self._rewrite_query(query)

        # 检索
        docs = self.retriever.get_relevant_documents(rewritten_query)

        # 后处理
        docs = self._post_process_docs(docs)

        return docs

    def _rewrite_query(self, query: str) -> str:
        """查询改写(提高检索质量)"""
        prompt = f"""请改写以下查询,使其更适合向量检索:

原始查询:{query}

改写后的查询(保持语义,优化关键词):"""

        response = self.llm.invoke(prompt)
        return response.content

    def _post_process_docs(self, docs: List[Document]) -> List[Document]:
        """文档后处理"""
        # 去重
        seen = set()
        unique_docs = []

        for doc in docs:
            doc_hash = hashlib.md5(doc.page_content.encode()).hexdigest()
            if doc_hash not in seen:
                seen.add(doc_hash)
                unique_docs.append(doc)

        # 按相关性重新排序
        unique_docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True)

        return unique_docs[:self.config.get("top_k", 5)]

    def generate_answer(self, query: str, docs: List[Document]) -> str:
        """生成答案"""
        context = "\n\n".join([doc.page_content for doc in docs])

        prompt = f"""基于以下上下文回答问题:

上下文:
{context}

问题:{query}

答案:"""

        response = self.llm.invoke(prompt)
        return response.content

    def query(self, query: str, use_cache: bool = True) -> Dict[str, Any]:
        """完整查询流程"""
        # 检查缓存
        if use_cache:
            cache_key = self._get_cache_key(query)
            cached = self.cache.get(cache_key)

            if cached:
                return json.loads(cached)

        # 检索
        docs = self.retrieve(query)

        # 生成答案
        answer = self.generate_answer(query, docs)

        result = {
            "query": query,
            "answer": answer,
            "sources": [
                {
                    "content": doc.page_content[:200] + "...",
                    "metadata": doc.metadata
                }
                for doc in docs
            ]
        }

        # 写入缓存
        if use_cache:
            self.cache.setex(
                cache_key,
                self.config.get("cache_ttl", 3600),
                json.dumps(result)
            )

        return result

RAG Agent 实现(推荐)

使用 @tool(response_format="content_and_artifact") 是构建 RAG Agent 的官方推荐方式,允许工具同时返回内容和原始数据:

from langchain_core.tools import tool
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from typing import List

# 创建向量存储
vectorstore = Chroma.from_documents(
    documents=[...],  # 你的文档
    embedding=OpenAIEmbeddings()
)

# 定义检索工具(使用 content_and_artifact 格式)
@tool(response_format="content_and_artifact")
def retrieve_docs(query: str) -> tuple[str, List]:
    """检索相关文档

    Args:
        query: 用户查询

    Returns:
        (content, artifact): 内容摘要和原始文档列表
    """
    # 检索文档
    docs = vectorstore.similarity_search(query, k=3)

    # 构造返回内容
    content = f"找到 {len(docs)} 个相关文档"
    artifact = [doc.page_content for doc in docs]  # 原始文档

    return content, artifact

# 创建 RAG Agent
rag_agent = create_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=[retrieve_docs],
    prompt="""你是一个 RAG 助手。

使用 retrieve_docs 工具获取相关文档,然后基于文档内容回答问题。
如果文档中没有相关信息,明确告诉用户。
"""
)

# 使用
result = rag_agent.invoke({
    "messages": [("user", "LangChain 1.0 有哪些新特性?")]
})

print(result["messages"][-1].content)
# Agent 会自动:
# 1. 调用 retrieve_docs 工具
# 2. 访问 artifact 中的原始文档
# 3. 基于文档内容生成答案

使用 RunnablePassthrough.assign() 的 LCEL 方式

from langchain_core.runnables import RunnablePassthrough
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义 RAG 提示词
rag_prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题:

上下文:
{context}

问题: {question}

回答:
""")

# 构建 RAG 链(使用 assign 添加检索结果)
rag_chain = (
    RunnablePassthrough.assign(
        context=lambda x: vectorstore.similarity_search(x["question"], k=3)
    )
    | RunnablePassthrough.assign(
        context=lambda x: "\n\n".join([doc.page_content for doc in x["context"]])
    )
    | rag_prompt
    | ChatOpenAI(model="gpt-4o")
    | StrOutputParser()
)

# 使用
answer = rag_chain.invoke({
    "question": "LangChain 1.0 有哪些新特性?"
})

使用 @dynamic_prompt 的 Middleware 方式(推荐)

from langchain.agents.middleware import dynamic_prompt, ModelRequest
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI

@dynamic_prompt
def prompt_with_context(request: ModelRequest) -> str:
    """动态注入检索上下文"""
    # 获取最后一条用户消息
    last_query = request.state["messages"][-1].text

    # 执行检索
    retrieved_docs = vectorstore.similarity_search(last_query, k=3)

    # 格式化文档内容
    docs_content = "\n\n".join([
        f"文档 {i+1}:\n{doc.page_content}"
        for i, doc in enumerate(retrieved_docs)
    ])

    # 返回系统消息(会自动注入到请求中)
    system_message = (
        "你是一个 RAG 助手。使用以下上下文回答用户问题:\n\n"
        f"{docs_content}\n\n"
        "如果上下文中没有相关信息,请明确告知用户。"
    )

    return system_message

# 创建 Agent(传入 middleware)
rag_agent = create_agent(
    model=ChatOpenAI(model="gpt-4o"),
    tools=[],  # 不需要工具,检索在 middleware 中完成
    middleware=[prompt_with_context]  # ✅ 关键:传入 dynamic_prompt
)

# 使用(自动在每次请求前检索)
result = rag_agent.invoke({
    "messages": [("user", "LangChain 1.0 有哪些新特性?")]
})

@dynamic_prompt 的优势

  1. 自动执行:每次模型调用前自动检索,无需手动调用工具
  2. 单次推理:只需 1 次 LLM 调用(vs Agent 工具模式需要 2 次)
  3. 透明注入:上下文自动添加到系统消息,模型无感知
  4. 适合简单场景:固定的检索-回答流程,性能最优

三种 RAG 实现方式对比

特性Agent + Tool@dynamic_promptLCEL Chain
LLM 调用次数2 次(决策+回答)1 次1 次
灵活性高(自主决策)中(固定检索)低(固定流程)
复杂度低(自动推理)低(Middleware)中(需设计链)
成本较高中等较低
可控性
适用场景复杂查询、多步推理简单 RAG、性能优化完全自定义流程

推荐实践

  1. 简单 RAG(单次检索):使用 @dynamic_prompt(性能最优)
  2. 复杂查询(多次检索):使用 Agent + Tool(自动决策)
  3. 完全自定义流程:使用 LCEL Chain(最大控制)
  4. 混合方案:Agent + dynamic_prompt(灵活性与效率兼顾)

RAG 优化策略

class OptimizedRAGPipeline:
    """优化的 RAG 流水线"""

    def __init__(self):
        self.strategies = {
            "hybrid_search": self.hybrid_search,
            "multi_query": self.multi_query_retrieval,
            "iterative": self.iterative_retrieval,
            "ensemble": self.ensemble_retrieval
        }

    def hybrid_search(self, query: str) -> List[Document]:
        """混合搜索(向量 + 关键词)"""
        # 向量搜索
        vector_results = self.vector_store.similarity_search(query, k=10)

        # BM25 关键词搜索
        keyword_results = self.bm25_retriever.get_relevant_documents(query)

        # 合并结果
        all_docs = vector_results + keyword_results

        # 使用 RRF (Reciprocal Rank Fusion) 重排序
        return self._reciprocal_rank_fusion(all_docs)

    def multi_query_retrieval(self, query: str) -> List[Document]:
        """多查询检索"""
        # 生成多个查询变体
        queries = self._generate_query_variants(query)

        all_docs = []
        for q in queries:
            docs = self.retriever.get_relevant_documents(q)
            all_docs.extend(docs)

        # 去重并重排序
        return self._deduplicate_and_rank(all_docs)

    def iterative_retrieval(self, query: str, max_iterations: int = 3) -> List[Document]:
        """迭代检索"""
        docs = []
        current_query = query

        for i in range(max_iterations):
            # 检索
            new_docs = self.retriever.get_relevant_documents(current_query)
            docs.extend(new_docs)

            # 判断是否需要继续
            if self._is_sufficient(docs, query):
                break

            # 基于已有结果生成新查询
            current_query = self._generate_followup_query(query, docs)

        return docs

    def ensemble_retrieval(self, query: str) -> List[Document]:
        """集成检索"""
        retrievers = [
            self.dense_retriever,    # 密集向量
            self.sparse_retriever,   # 稀疏向量
            self.cross_encoder      # 交叉编码器
        ]

        all_results = []
        weights = [0.5, 0.3, 0.2]  # 权重

        for retriever, weight in zip(retrievers, weights):
            docs = retriever.get_relevant_documents(query)
            for doc in docs:
                doc.metadata["weight"] = weight
            all_results.extend(docs)

        return self._weighted_merge(all_results)

2.2 Multi-Agent 架构选择#

生产环境多 Agent 架构决策

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class AgentArchitecture(Enum):
    """Agent 架构类型"""
    MONOLITHIC = "monolithic"           # 单体 Agent
    SUPERVISOR_WORKER = "supervisor"     # 监督者-工作者
    PIPELINE = "pipeline"                # 流水线
    HIERARCHICAL = "hierarchical"       # 层级
    MESH = "mesh"                       # 网状

@dataclass
class ArchitectureDecision:
    """架构决策"""
    architecture: AgentArchitecture
    reason: str
    pros: List[str]
    cons: List[str]
    implementation_complexity: int  # 1-10

class ArchitectureSelector:
    """架构选择器"""

    def select(
        self,
        task_complexity: int,      # 1-10
        team_size: int,            # Agent 数量
        coordination_level: str,    # low/medium/high
        latency_requirement: str,   # low/medium/high
        scalability_need: str       # low/medium/high
    ) -> ArchitectureDecision:
        """选择合适的架构"""

        # 简单任务
        if task_complexity <= 3:
            if team_size <= 1:
                return ArchitectureDecision(
                    architecture=AgentArchitecture.MONOLITHIC,
                    reason="简单任务使用单体 Agent 即可",
                    pros=["简单", "低延迟", "易调试"],
                    cons=["扩展性差", "功能受限"],
                    implementation_complexity=2
                )

        # 中等复杂度
        if task_complexity <= 6:
            if coordination_level == "high":
                return ArchitectureDecision(
                    architecture=AgentArchitecture.SUPERVISOR_WORKER,
                    reason="需要高度协调,使用监督者模式",
                    pros=["控制清晰", "易于管理", "责任明确"],
                    cons=["中央瓶颈", "监督者复杂"],
                    implementation_complexity=5
                )
            else:
                return ArchitectureDecision(
                    architecture=AgentArchitecture.PIPELINE,
                    reason="顺序处理任务,使用流水线",
                    pros=["流程清晰", "易于扩展", "可并行"],
                    cons=["灵活性差", "错误传播"],
                    implementation_complexity=4
                )

        # 高复杂度
        if team_size > 10:
            return ArchitectureDecision(
                architecture=AgentArchitecture.HIERARCHICAL,
                reason="大规模团队需要层级管理",
                pros=["可扩展", "职责清晰", "易于管理大团队"],
                cons=["复杂", "调试困难", "层级延迟"],
                implementation_complexity=8
            )

        # 默认
        return ArchitectureDecision(
            architecture=AgentArchitecture.SUPERVISOR_WORKER,
            reason="通用架构,适合大多数场景",
            pros=["平衡", "成熟", "灵活"],
            cons=["可能过度设计"],
            implementation_complexity=5
        )

# 生产级多 Agent 实现
class ProductionMultiAgentSystem:
    """生产级多 Agent 系统"""

    def __init__(self, architecture: AgentArchitecture):
        self.architecture = architecture
        self.agents = {}
        self.metrics = {}
        self.circuit_breakers = {}

    def register_agent(self, name: str, agent: Any):
        """注册 Agent(带健康检查)"""
        self.agents[name] = agent
        self.circuit_breakers[name] = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60
        )

    def execute_with_fallback(self, agent_name: str, task: dict) -> dict:
        """带降级的执行"""
        breaker = self.circuit_breakers[agent_name]

        if breaker.is_open():
            # 熔断器打开,使用降级策略
            return self._fallback_strategy(agent_name, task)

        try:
            # 正常执行
            result = self.agents[agent_name].invoke(task)
            breaker.record_success()
            return result

        except Exception as e:
            breaker.record_failure()

            if breaker.is_open():
                # 刚刚熔断
                self._alert_circuit_open(agent_name)

            # 使用降级策略
            return self._fallback_strategy(agent_name, task)

    def _fallback_strategy(self, agent_name: str, task: dict) -> dict:
        """降级策略"""
        # 策略 1:使用备用 Agent
        backup_agent = self.agents.get(f"{agent_name}_backup")
        if backup_agent:
            return backup_agent.invoke(task)

        # 策略 2:返回缓存结果
        cached = self.get_cached_result(task)
        if cached:
            return cached

        # 策略 3:返回默认响应
        return {
            "status": "degraded",
            "message": f"{agent_name} 暂时不可用,请稍后重试"
        }

2.3 Workflow 架构模式#

工作流编排架构

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class WorkflowOrchestrator:
    """工作流编排器"""

    def __init__(self):
        self.workflows = {}
        self.templates = self._load_templates()

    def _load_templates(self):
        """加载工作流模板"""
        return {
            "approval_flow": self._create_approval_workflow(),
            "data_pipeline": self._create_data_pipeline(),
            "customer_service": self._create_cs_workflow()
        }

    def _create_approval_workflow(self):
        """创建审批工作流"""

        class ApprovalState(TypedDict):
            request: dict
            approvals: Annotated[List[dict], operator.add]
            status: str
            level: int

        workflow = StateGraph(ApprovalState)

        # 节点
        workflow.add_node("validate", self.validate_request)
        workflow.add_node("level1_approval", self.level1_approval)
        workflow.add_node("level2_approval", self.level2_approval)
        workflow.add_node("execute", self.execute_request)
        workflow.add_node("notify", self.notify_result)

        # 边
        workflow.add_edge("validate", "level1_approval")

        # 条件边
        workflow.add_conditional_edges(
            "level1_approval",
            lambda x: "level2" if x["request"]["amount"] > 10000 else "execute",
            {
                "level2": "level2_approval",
                "execute": "execute"
            }
        )

        workflow.add_edge("level2_approval", "execute")
        workflow.add_edge("execute", "notify")
        workflow.add_edge("notify", END)

        workflow.set_entry_point("validate")

        return workflow.compile()

    def create_dynamic_workflow(self, config: dict):
        """动态创建工作流"""
        workflow = StateGraph(dict)

        # 动态添加节点
        for node_config in config["nodes"]:
            node_func = self._create_node_function(node_config)
            workflow.add_node(node_config["id"], node_func)

        # 动态添加边
        for edge_config in config["edges"]:
            if edge_config["type"] == "direct":
                workflow.add_edge(edge_config["from"], edge_config["to"])
            elif edge_config["type"] == "conditional":
                workflow.add_conditional_edges(
                    edge_config["from"],
                    self._create_condition_function(edge_config["condition"]),
                    edge_config["branches"]
                )

        workflow.set_entry_point(config["entry"])

        return workflow.compile()

第3章: 性能与成本优化#

关注点:掌握生产环境的性能调优和成本控制。

3.1 延迟优化#

综合延迟优化策略

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from functools import lru_cache
from typing import List, Dict, Any

class LatencyOptimizer:
    """延迟优化器"""

    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.model_latencies = {}  # 记录模型延迟

    def select_optimal_model(self, task_type: str, max_latency: float) -> str:
        """根据延迟要求选择模型"""
        model_configs = {
            "gpt-4o": {"latency": 2.5, "quality": 0.95, "cost": 0.03},
            "gpt-4o-mini": {"latency": 0.8, "quality": 0.85, "cost": 0.001},
            "gpt-3.5-turbo": {"latency": 0.5, "quality": 0.80, "cost": 0.0005},
            "claude-3-haiku": {"latency": 0.3, "quality": 0.82, "cost": 0.0003}
        }

        # 筛选满足延迟要求的模型
        eligible_models = [
            model for model, config in model_configs.items()
            if config["latency"] <= max_latency
        ]

        if not eligible_models:
            raise ValueError(f"没有模型满足 {max_latency}s 延迟要求")

        # 选择质量最高的
        return max(eligible_models, key=lambda x: model_configs[x]["quality"])

    async def parallel_execution(self, tasks: List[Dict[str, Any]]) -> List[Any]:
        """并行执行多个任务"""

        async def execute_task(task):
            """执行单个任务"""
            start = time.time()

            # 异步执行
            result = await self._async_invoke(task)

            # 记录延迟
            latency = time.time() - start
            self._record_latency(task["model"], latency)

            return result

        # 并行执行所有任务
        results = await asyncio.gather(*[execute_task(task) for task in tasks])

        return results

    @lru_cache(maxsize=1000)
    def cached_inference(self, prompt_hash: str) -> str:
        """缓存的推理"""
        # 缓存会自动处理
        return self._invoke_llm(prompt_hash)

    def optimize_prompt_batching(self, prompts: List[str]) -> List[str]:
        """优化提示词批处理"""
        # 批处理大小优化
        optimal_batch_size = self._calculate_optimal_batch_size(len(prompts))

        results = []
        for i in range(0, len(prompts), optimal_batch_size):
            batch = prompts[i:i + optimal_batch_size]

            # 批量处理
            batch_results = self._batch_inference(batch)
            results.extend(batch_results)

        return results

    def _calculate_optimal_batch_size(self, total_count: int) -> int:
        """计算最优批处理大小"""
        # 基于经验公式
        if total_count < 10:
            return total_count
        elif total_count < 100:
            return 10
        else:
            return min(50, total_count // 10)

    def pipeline_optimization(self):
        """流水线优化"""

        class Pipeline:
            def __init__(self):
                self.stages = []

            def add_stage(self, func, parallel=False):
                self.stages.append((func, parallel))

            async def execute(self, input_data):
                current = input_data

                for func, parallel in self.stages:
                    if parallel and isinstance(current, list):
                        # 并行处理列表
                        current = await asyncio.gather(*[func(item) for item in current])
                    else:
                        # 串行处理
                        current = await func(current)

                return current

        return Pipeline()

3.2 吞吐量优化#

吞吐量优化实现

import asyncio
from asyncio import Queue, Semaphore
from typing import Optional
import aiohttp

class ThroughputOptimizer:
    """吞吐量优化器"""

    def __init__(self, max_concurrent: int = 100):
        self.semaphore = Semaphore(max_concurrent)
        self.request_queue = Queue()
        self.connection_pool = None

    async def init_connection_pool(self):
        """初始化连接池"""
        connector = aiohttp.TCPConnector(
            limit=100,  # 总连接数
            limit_per_host=30,  # 每个主机的连接数
            ttl_dns_cache=300  # DNS 缓存时间
        )

        timeout = aiohttp.ClientTimeout(
            total=300,
            connect=10,
            sock_read=30
        )

        self.connection_pool = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        )

    async def batch_processor(self, batch_size: int = 10):
        """批处理处理器"""
        batch = []

        while True:
            try:
                # 收集批次
                while len(batch) < batch_size:
                    item = await asyncio.wait_for(
                        self.request_queue.get(),
                        timeout=1.0  # 1秒超时
                    )
                    batch.append(item)

            except asyncio.TimeoutError:
                # 超时但有数据,处理现有批次
                if batch:
                    await self._process_batch(batch)
                    batch = []

            # 批次满,处理
            if len(batch) >= batch_size:
                await self._process_batch(batch)
                batch = []

    async def _process_batch(self, batch: List[dict]):
        """处理一个批次"""
        async with self.semaphore:
            # 批量 API 调用
            tasks = [self._make_request(item) for item in batch]
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # 处理结果
            for item, result in zip(batch, results):
                if isinstance(result, Exception):
                    await self._handle_error(item, result)
                else:
                    await self._handle_success(item, result)

    async def adaptive_concurrency(self):
        """自适应并发控制"""

        class AdaptiveLimiter:
            def __init__(self):
                self.current_limit = 10
                self.min_limit = 5
                self.max_limit = 100
                self.success_rate = 1.0
                self.adjustment_interval = 10  # 秒

            async def adjust(self):
                """调整并发限制"""
                while True:
                    await asyncio.sleep(self.adjustment_interval)

                    if self.success_rate > 0.95:
                        # 成功率高,增加并发
                        self.current_limit = min(
                            self.current_limit * 1.2,
                            self.max_limit
                        )
                    elif self.success_rate < 0.8:
                        # 成功率低,减少并发
                        self.current_limit = max(
                            self.current_limit * 0.8,
                            self.min_limit
                        )

        return AdaptiveLimiter()

3.3 Token 管理与成本控制#

Token 成本优化系统

from dataclasses import dataclass
from typing import Dict, List
import tiktoken

@dataclass
class TokenBudget:
    """Token 预算"""
    total_budget: int
    used: int = 0

    @property
    def remaining(self) -> int:
        return self.total_budget - self.used

    def can_afford(self, tokens: int) -> bool:
        return self.remaining >= tokens

class TokenOptimizer:
    """Token 优化器"""

    def __init__(self):
        self.encoding = tiktoken.encoding_for_model("gpt-4")
        self.pricing = {
            "gpt-4o": {"input": 0.0025, "output": 0.01},  # per 1K tokens (2024年最新价格)
            "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
            "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015}
        }

    def count_tokens(self, text: str) -> int:
        """计算 Token 数量"""
        return len(self.encoding.encode(text))

    def optimize_prompt(self, prompt: str, max_tokens: int) -> str:
        """优化提示词长度"""
        tokens = self.count_tokens(prompt)

        if tokens <= max_tokens:
            return prompt

        # 策略 1:截断
        if tokens < max_tokens * 1.5:
            return self._truncate_prompt(prompt, max_tokens)

        # 策略 2:摘要
        return self._summarize_prompt(prompt, max_tokens)

    def _truncate_prompt(self, prompt: str, max_tokens: int) -> str:
        """截断提示词"""
        tokens = self.encoding.encode(prompt)

        # 保留开头和结尾
        if max_tokens > 200:
            start_tokens = tokens[:max_tokens//2]
            end_tokens = tokens[-(max_tokens//2):]

            truncated = start_tokens + end_tokens
            return self.encoding.decode(truncated)

        # 只保留开头
        return self.encoding.decode(tokens[:max_tokens])

    def _summarize_prompt(self, prompt: str, max_tokens: int) -> str:
        """摘要提示词"""
        # 使用便宜的模型进行摘要
        summary_prompt = f"将以下内容摘要到 {max_tokens} tokens 以内:\n\n{prompt}"

        # 调用 gpt-3.5-turbo 进行摘要
        # ... 实现略

    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """计算成本"""
        if model not in self.pricing:
            raise ValueError(f"Unknown model: {model}")

        pricing = self.pricing[model]

        input_cost = (input_tokens / 1000) * pricing["input"]
        output_cost = (output_tokens / 1000) * pricing["output"]

        return input_cost + output_cost

    def cost_aware_routing(self, task: dict, budget: TokenBudget) -> str:
        """成本感知路由"""
        # 估算不同模型的成本
        estimates = {}

        for model in self.pricing.keys():
            estimated_tokens = self._estimate_tokens(task, model)
            estimated_cost = self.calculate_cost(
                model,
                estimated_tokens["input"],
                estimated_tokens["output"]
            )

            if budget.can_afford(estimated_cost * 1000):  # 转换为 token 单位
                estimates[model] = {
                    "cost": estimated_cost,
                    "quality": self._get_model_quality(model)
                }

        if not estimates:
            raise ValueError("预算不足")

        # 在预算内选择质量最好的
        best_model = max(estimates.items(), key=lambda x: x[1]["quality"])

        return best_model[0]

3.4 缓存复用策略#

多层缓存架构

import hashlib
import pickle
from typing import Any, Optional
from datetime import datetime, timedelta

class MultiLevelCache:
    """多层缓存系统"""

    def __init__(self):
        self.l1_cache = {}  # 内存缓存
        self.l2_cache = Redis()  # Redis 缓存
        self.l3_cache = S3Cache()  # S3 长期缓存

    def get(self, key: str) -> Optional[Any]:
        """获取缓存(逐层查找)"""
        # L1 缓存
        if key in self.l1_cache:
            self._promote_to_l1(key, self.l1_cache[key])
            return self.l1_cache[key]

        # L2 缓存
        value = self.l2_cache.get(key)
        if value:
            self._promote_to_l1(key, value)
            return value

        # L3 缓存
        value = self.l3_cache.get(key)
        if value:
            self._promote_to_l2(key, value)
            self._promote_to_l1(key, value)
            return value

        return None

    def set(
        self,
        key: str,
        value: Any,
        ttl_l1: int = 300,  # 5 分钟
        ttl_l2: int = 3600,  # 1 小时
        ttl_l3: int = 86400  # 1 天
    ):
        """设置缓存(写入所有层)"""
        # 写入 L1
        self.l1_cache[key] = value
        self._schedule_l1_eviction(key, ttl_l1)

        # 写入 L2
        self.l2_cache.setex(key, ttl_l2, pickle.dumps(value))

        # 写入 L3
        self.l3_cache.put(key, value, ttl_l3)

    def _promote_to_l1(self, key: str, value: Any):
        """提升到 L1 缓存"""
        self.l1_cache[key] = value

    def _promote_to_l2(self, key: str, value: Any):
        """提升到 L2 缓存"""
        self.l2_cache.setex(key, 3600, pickle.dumps(value))

class SemanticCache:
    """语义缓存"""

    def __init__(self, similarity_threshold: float = 0.95):
        self.embeddings = OpenAIEmbeddings()
        self.vector_store = FAISS()
        self.cache = {}
        self.similarity_threshold = similarity_threshold

    def get(self, query: str) -> Optional[str]:
        """基于语义相似度获取缓存"""
        # 获取查询向量
        query_embedding = self.embeddings.embed_query(query)

        # 搜索相似查询
        similar_docs = self.vector_store.similarity_search_with_score(
            query,
            k=1
        )

        if similar_docs:
            doc, score = similar_docs[0]

            if score >= self.similarity_threshold:
                # 找到相似查询,返回缓存结果
                cache_key = doc.metadata["cache_key"]
                return self.cache.get(cache_key)

        return None

    def set(self, query: str, result: str):
        """设置语义缓存"""
        # 生成缓存键
        cache_key = hashlib.md5(query.encode()).hexdigest()

        # 存储结果
        self.cache[cache_key] = result

        # 存储查询向量
        self.vector_store.add_texts(
            [query],
            metadatas=[{"cache_key": cache_key, "timestamp": datetime.now()}]
        )

3.5 自适应模型选择#

根据查询复杂度自动选择模型

class AdaptiveModelSelector:
    """自适应模型选择器"""

    def select_model(self, query: str, context: dict) -> str:
        """根据查询复杂度选择模型"""
        complexity = self._estimate_complexity(query, context)

        if complexity < 0.3:
            return "gpt-3.5-turbo"  # 简单查询,节省成本
        elif complexity < 0.7:
            return "gpt-4o-mini"    # 中等查询
        else:
            return "gpt-4o"         # 复杂查询,确保质量

    def _estimate_complexity(self, query: str, context: dict) -> float:
        """估算查询复杂度(0-1)"""
        score = 0

        # 查询长度
        if len(query) > 100:
            score += 0.2

        # 是否需要多步推理
        reasoning_keywords = ["为什么", "如何", "分析", "对比"]
        if any(kw in query for kw in reasoning_keywords):
            score += 0.3

        # 是否需要工具
        if context.get("tools_required"):
            score += 0.2

        # 上下文长度
        if len(context.get("history", [])) > 10:
            score += 0.3

        return min(score, 1.0)

# 使用示例
selector = AdaptiveModelSelector()

queries = [
    "你好",  # 简单 → gpt-3.5-turbo
    "如何优化Python代码性能?",  # 中等 → gpt-4o-mini
    "分析这段代码的时间复杂度并提出优化方案",  # 复杂 → gpt-4o
]

for query in queries:
    model = selector.select_model(query, {})
    print(f"查询: {query}\n选择模型: {model}\n")

# 成本节省:
# 假设100次查询:
# - 60%简单(gpt-3.5-turbo: $0.001/次) = $0.06
# - 30%中等(gpt-4o-mini: $0.005/次) = $0.15
# - 10%复杂(gpt-4o: $0.01/次) = $0.10
# 总成本 = $0.31
# vs 全用gpt-4o = $1.00
# 节省69%

3.6 批处理优化#

批量处理减少网络开销

import asyncio
from typing import List, Dict

async def batch_process(
    queries: List[str],
    batch_size: int = 10
) -> List[Dict]:
    """批量处理查询(减少overhead)"""

    results = []

    for i in range(0, len(queries), batch_size):
        batch = queries[i:i+batch_size]

        # 并发处理batch内的查询
        tasks = [
            asyncio.create_task(agent.ainvoke({"messages": [("user", q)]}))
            for q in batch
        ]

        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)

    return results

# 使用
queries = [f"查询{i}" for i in range(100)]
results = asyncio.run(batch_process(queries, batch_size=10))

# 优势:
# 1. 减少网络overhead(连接复用)
# 2. 可以利用批量定价(某些API提供商)
# 3. 提高整体吞吐量

3.7 成本监控与优化清单#

实时成本追踪系统

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict

@dataclass
class CostMetrics:
    """成本指标"""
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost_usd: float

class CostMonitor:
    """成本监控器"""

    # 模型价格($/1M tokens,2025年11月)
    PRICING = {
        "gpt-4o": {"input": 2.50, "output": 10.00},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-3.5-turbo": {"input": 0.50, "output": 1.50},
    }

    def __init__(self):
        self.metrics: List[CostMetrics] = []

    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int
    ):
        """记录一次调用的成本"""
        pricing = self.PRICING[model]
        cost = (
            input_tokens * pricing["input"] / 1_000_000 +
            output_tokens * pricing["output"] / 1_000_000
        )

        self.metrics.append(CostMetrics(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost_usd=cost
        ))

    def get_total_cost(self, hours: int = 24) -> Dict:
        """获取总成本统计"""
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [m for m in self.metrics if m.timestamp > cutoff]

        return {
            "total_cost_usd": sum(m.cost_usd for m in recent),
            "total_requests": len(recent),
            "avg_cost_per_request": sum(m.cost_usd for m in recent) / len(recent) if recent else 0,
            "by_model": {
                model: {
                    "requests": len([m for m in recent if m.model == model]),
                    "cost": sum(m.cost_usd for m in recent if m.model == model)
                }
                for model in self.PRICING.keys()
            }
        }

# 使用
monitor = CostMonitor()

# 记录调用
monitor.record("gpt-4o", input_tokens=2000, output_tokens=500)
monitor.record("gpt-4o-mini", input_tokens=1500, output_tokens=300)

# 查看成本
stats = monitor.get_total_cost(hours=24)
print(f"24小时总成本: ${stats['total_cost_usd']:.4f}")
print(f"平均每次请求: ${stats['avg_cost_per_request']:.6f}")
print(f"按模型分组: {stats['by_model']}")

成本优化清单

  • Prompt优化

    • 精简系统提示词(避免冗余描述)
    • 移除无关信息
    • 使用简洁表达
  • 上下文管理

    • 限制历史消息数量(trim_messages)
    • 压缩旧消息为摘要
    • 智能裁剪上下文窗口
  • 缓存策略

    • 精确缓存(相同查询)
    • 语义缓存(相似查询)
    • 设置合理TTL
  • 模型选择

    • 根据复杂度自适应选择
    • 简单任务用gpt-3.5-turbo
    • 工具调用优先用mini模型
  • 批处理

    • 合并相似请求
    • 批量处理降低overhead
    • 并发执行提高效率

成本目标

场景优化前优化后节省
简单问答$0.01/次$0.001/次90%
复杂推理$0.05/次$0.02/次60%
多轮对话$0.10/次$0.04/次60%

监控指标

  • 平均每次请求成本
  • 每日/每月总成本
  • 各模型成本占比
  • Token使用效率(输出/输入比)

第4章: 安全合规与防护#

关注点:掌握生产环境的安全防护和合规要求。

4.1 Guardrails 防护体系#

双层防护架构

from abc import ABC, abstractmethod
from typing import List, Dict, Any

class Guardrail(ABC):
    """防护栏基类"""

    @abstractmethod
    def check(self, content: str) -> Dict[str, Any]:
        """检查内容"""
        pass

class DeterministicGuardrail(Guardrail):
    """确定性防护(基于规则)"""

    def __init__(self):
        self.rules = self._load_rules()

    def _load_rules(self):
        """加载规则"""
        return {
            "sql_injection": r"(\bSELECT\b.*\bFROM\b|\bDROP\b|\bDELETE\b.*\bFROM\b)",
            "xss": r"(<script|javascript:|onerror=|onclick=)",
            "pii_email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
            "pii_phone": r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
            "pii_ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "profanity": self._load_profanity_list()
        }

    def check(self, content: str) -> Dict[str, Any]:
        """规则检查"""
        violations = []

        for rule_name, pattern in self.rules.items():
            if re.search(pattern, content, re.IGNORECASE):
                violations.append({
                    "rule": rule_name,
                    "severity": self._get_severity(rule_name),
                    "action": self._get_action(rule_name)
                })

        return {
            "passed": len(violations) == 0,
            "violations": violations
        }

    def _get_severity(self, rule: str) -> str:
        """获取严重程度"""
        severity_map = {
            "sql_injection": "critical",
            "xss": "critical",
            "pii_ssn": "high",
            "pii_email": "medium",
            "profanity": "low"
        }
        return severity_map.get(rule, "medium")

    def _get_action(self, rule: str) -> str:
        """获取处理动作"""
        action_map = {
            "sql_injection": "block",
            "xss": "block",
            "pii_ssn": "redact",
            "pii_email": "mask",
            "profanity": "warn"
        }
        return action_map.get(rule, "warn")

class ModelBasedGuardrail(Guardrail):
    """基于模型的防护"""

    def __init__(self, model: ChatOpenAI):
        self.model = model
        self.categories = [
            "harmful_content",
            "bias",
            "misinformation",
            "inappropriate",
            "off_topic"
        ]

    def check(self, content: str) -> Dict[str, Any]:
        """模型检查"""
        prompt = f"""分析以下内容是否存在问题。

内容:{content}

检查类别:
{', '.join(self.categories)}

返回 JSON 格式:
{{
    "passed": true/false,
    "violations": [
        {{"category": "...", "confidence": 0.0-1.0, "reason": "..."}}
    ]
}}"""

        response = self.model.invoke(prompt)

        # 解析响应
        try:
            result = json.loads(response.content)
            return result
        except:
            # 解析失败,保守处理
            return {
                "passed": False,
                "violations": [{
                    "category": "parse_error",
                    "confidence": 1.0,
                    "reason": "无法解析检查结果"
                }]
            }

class HybridGuardrailSystem:
    """混合防护系统"""

    def __init__(self):
        self.deterministic = DeterministicGuardrail()
        self.model_based = ModelBasedGuardrail(ChatOpenAI(model="gpt-4o"))
        self.cache = {}

    def check(self, content: str, use_cache: bool = True) -> Dict[str, Any]:
        """综合检查"""
        # 检查缓存
        if use_cache:
            cache_key = hashlib.md5(content.encode()).hexdigest()
            if cache_key in self.cache:
                return self.cache[cache_key]

        # 第一层:确定性检查(快速)
        deterministic_result = self.deterministic.check(content)

        # 如果确定性检查发现严重问题,直接返回
        if not deterministic_result["passed"]:
            critical_violations = [
                v for v in deterministic_result["violations"]
                if v["severity"] == "critical"
            ]

            if critical_violations:
                result = {
                    "passed": False,
                    "stage": "deterministic",
                    "violations": critical_violations,
                    "action": "block"
                }

                if use_cache:
                    self.cache[cache_key] = result

                return result

        # 第二层:模型检查(更全面但较慢)
        model_result = self.model_based.check(content)

        # 合并结果
        all_violations = (
            deterministic_result.get("violations", []) +
            model_result.get("violations", [])
        )

        result = {
            "passed": len(all_violations) == 0,
            "stage": "hybrid",
            "violations": all_violations,
            "action": self._determine_action(all_violations)
        }

        if use_cache:
            self.cache[cache_key] = result

        return result

    def _determine_action(self, violations: List[dict]) -> str:
        """确定处理动作"""
        if not violations:
            return "pass"

        # 根据最严重的违规确定动作
        severities = [v.get("severity", "medium") for v in violations]

        if "critical" in severities:
            return "block"
        elif "high" in severities:
            return "review"
        else:
            return "warn"

4.2 PII 检测策略#

PII 检测与处理

import re
from enum import Enum
from typing import List, Tuple

class PIIType(Enum):
    """PII 类型"""
    EMAIL = "email"
    PHONE = "phone"
    SSN = "ssn"
    CREDIT_CARD = "credit_card"
    IP_ADDRESS = "ip"
    NAME = "name"
    ADDRESS = "address"

class PIIStrategy(Enum):
    """PII 处理策略"""
    REDACT = "redact"      # 完全删除
    MASK = "mask"          # 部分遮蔽
    HASH = "hash"          # 哈希替换
    ENCRYPT = "encrypt"    # 加密
    TOKENIZE = "tokenize"  # 令牌化

class PIIDetector:
    """PII 检测器"""

    def __init__(self):
        self.patterns = {
            PIIType.EMAIL: r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            PIIType.PHONE: r'(\+\d{1,3}[-.\s]?)?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}',
            PIIType.SSN: r'\b\d{3}-\d{2}-\d{4}\b',
            PIIType.CREDIT_CARD: r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b',
            PIIType.IP_ADDRESS: r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
        }

        # 名称检测需要 NER 模型
        self.ner_model = self._load_ner_model()

    def detect(self, text: str) -> List[Tuple[PIIType, str, int, int]]:
        """检测 PII"""
        detections = []

        # 正则检测
        for pii_type, pattern in self.patterns.items():
            for match in re.finditer(pattern, text):
                detections.append((
                    pii_type,
                    match.group(),
                    match.start(),
                    match.end()
                ))

        # NER 检测(名称、地址)
        entities = self.ner_model.extract_entities(text)
        for entity in entities:
            if entity["type"] == "PERSON":
                detections.append((
                    PIIType.NAME,
                    entity["text"],
                    entity["start"],
                    entity["end"]
                ))
            elif entity["type"] == "LOCATION":
                detections.append((
                    PIIType.ADDRESS,
                    entity["text"],
                    entity["start"],
                    entity["end"]
                ))

        return detections

    def process(self, text: str, strategy: PIIStrategy) -> str:
        """处理 PII"""
        detections = self.detect(text)

        # 按位置倒序处理(避免索引偏移)
        detections.sort(key=lambda x: x[2], reverse=True)

        processed_text = text

        for pii_type, value, start, end in detections:
            replacement = self._get_replacement(pii_type, value, strategy)
            processed_text = processed_text[:start] + replacement + processed_text[end:]

        return processed_text

    def _get_replacement(self, pii_type: PIIType, value: str, strategy: PIIStrategy) -> str:
        """获取替换文本"""
        if strategy == PIIStrategy.REDACT:
            return f"[{pii_type.value.upper()}_REDACTED]"

        elif strategy == PIIStrategy.MASK:
            if pii_type == PIIType.EMAIL:
                # 保留域名
                parts = value.split('@')
                if len(parts) == 2:
                    masked_local = parts[0][0] + '*' * (len(parts[0]) - 2) + parts[0][-1]
                    return f"{masked_local}@{parts[1]}"

            elif pii_type == PIIType.PHONE:
                # 保留前3位和后2位
                if len(value) >= 5:
                    return value[:3] + '*' * (len(value) - 5) + value[-2:]

            # 默认遮蔽
            return '*' * len(value)

        elif strategy == PIIStrategy.HASH:
            return f"[HASH:{hashlib.sha256(value.encode()).hexdigest()[:8]}]"

        elif strategy == PIIStrategy.ENCRYPT:
            # 实际加密实现
            encrypted = self._encrypt(value)
            return f"[ENC:{encrypted}]"

        elif strategy == PIIStrategy.TOKENIZE:
            # 生成唯一令牌
            token = self._generate_token(pii_type, value)
            return f"[TOKEN:{token}]"

        return value

4.3 分层防护组合#

多层防护架构

class LayeredDefenseSystem:
    """分层防护系统"""

    def __init__(self):
        self.layers = [
            InputValidationLayer(),
            RateLimitLayer(),
            AuthenticationLayer(),
            PIIProtectionLayer(),
            ContentModerationLayer(),
            OutputSanitizationLayer(),
            AuditLoggingLayer()
        ]

    async def process_request(self, request: dict) -> dict:
        """处理请求(通过所有防护层)"""
        context = {
            "request": request,
            "user": request.get("user"),
            "timestamp": datetime.now(),
            "trace_id": str(uuid.uuid4())
        }

        # 逐层处理
        for layer in self.layers:
            try:
                result = await layer.process(context)

                if not result["passed"]:
                    # 某层未通过,记录并返回
                    self._log_rejection(layer, context, result)

                    return {
                        "status": "rejected",
                        "layer": layer.__class__.__name__,
                        "reason": result.get("reason"),
                        "trace_id": context["trace_id"]
                    }

                # 更新上下文
                context.update(result.get("context_updates", {}))

            except Exception as e:
                # 层处理异常
                self._log_error(layer, context, e)

                # 根据层的重要性决定是否继续
                if layer.is_critical():
                    return {
                        "status": "error",
                        "layer": layer.__class__.__name__,
                        "error": str(e),
                        "trace_id": context["trace_id"]
                    }

        # 所有层都通过
        return {
            "status": "approved",
            "context": context,
            "trace_id": context["trace_id"]
        }

class DefenseLayer(ABC):
    """防护层基类"""

    @abstractmethod
    async def process(self, context: dict) -> dict:
        """处理请求"""
        pass

    def is_critical(self) -> bool:
        """是否关键层(失败时必须停止)"""
        return False

class InputValidationLayer(DefenseLayer):
    """输入验证层"""

    async def process(self, context: dict) -> dict:
        request = context["request"]

        # 验证必填字段
        required_fields = ["user_id", "content", "timestamp"]
        for field in required_fields:
            if field not in request:
                return {
                    "passed": False,
                    "reason": f"Missing required field: {field}"
                }

        # 验证输入长度
        if len(request.get("content", "")) > 10000:
            return {
                "passed": False,
                "reason": "Content too long (max 10000 chars)"
            }

        # 验证输入格式
        if not self._validate_format(request):
            return {
                "passed": False,
                "reason": "Invalid input format"
            }

        return {"passed": True}

    def is_critical(self) -> bool:
        return True

class RateLimitLayer(DefenseLayer):
    """速率限制层"""

    def __init__(self):
        self.limits = {}  # user_id -> requests

    async def process(self, context: dict) -> dict:
        user_id = context["user"]["id"]

        # 检查速率限制
        current_count = self.limits.get(user_id, 0)

        if current_count >= 100:  # 每分钟 100 个请求
            return {
                "passed": False,
                "reason": "Rate limit exceeded"
            }

        # 更新计数
        self.limits[user_id] = current_count + 1

        # 异步重置计数器
        asyncio.create_task(self._reset_counter(user_id))

        return {"passed": True}

    async def _reset_counter(self, user_id: str):
        """60秒后重置计数器"""
        await asyncio.sleep(60)
        self.limits[user_id] = 0

4.4 自定义 Guardrails 开发#

自定义防护栏框架

from typing import Callable, List, Optional
import inspect

class GuardrailFramework:
    """防护栏框架"""

    def __init__(self):
        self.pre_processors = []
        self.validators = []
        self.post_processors = []

    def pre_process(self, func: Callable) -> Callable:
        """注册预处理器"""
        self.pre_processors.append(func)
        return func

    def validate(self, func: Callable) -> Callable:
        """注册验证器"""
        self.validators.append(func)
        return func

    def post_process(self, func: Callable) -> Callable:
        """注册后处理器"""
        self.post_processors.append(func)
        return func

    async def execute(self, input_data: Any) -> Any:
        """执行防护流程"""
        # 预处理
        processed_data = input_data
        for processor in self.pre_processors:
            if inspect.iscoroutinefunction(processor):
                processed_data = await processor(processed_data)
            else:
                processed_data = processor(processed_data)

        # 验证
        for validator in self.validators:
            if inspect.iscoroutinefunction(validator):
                is_valid = await validator(processed_data)
            else:
                is_valid = validator(processed_data)

            if not is_valid:
                raise ValidationError(f"Validation failed: {validator.__name__}")

        # 后处理
        for processor in self.post_processors:
            if inspect.iscoroutinefunction(processor):
                processed_data = await processor(processed_data)
            else:
                processed_data = processor(processed_data)

        return processed_data

# 使用示例
guardrail = GuardrailFramework()

@guardrail.pre_process
def sanitize_input(data: dict) -> dict:
    """清理输入"""
    data["content"] = data["content"].strip()
    return data

@guardrail.validate
def check_content_length(data: dict) -> bool:
    """检查内容长度"""
    return len(data["content"]) <= 1000

@guardrail.validate
async def check_toxicity(data: dict) -> bool:
    """检查毒性(使用外部 API)"""
    # 调用毒性检测 API
    result = await toxicity_api.check(data["content"])
    return result["toxicity_score"] < 0.7

@guardrail.post_process
def add_metadata(data: dict) -> dict:
    """添加元数据"""
    data["processed_at"] = datetime.now()
    data["guardrail_version"] = "1.0"
    return data

# 执行
result = await guardrail.execute({"content": "用户输入"})

4.5 输入输出安全#

安全处理框架

import html
import re
from typing import Optional

class SecurityFramework:
    """安全框架"""

    def __init__(self):
        self.input_sanitizer = InputSanitizer()
        self.output_validator = OutputValidator()
        self.encryption = EncryptionService()

    def secure_input(self, raw_input: str) -> str:
        """安全化输入"""
        # 1. HTML 实体编码
        sanitized = html.escape(raw_input)

        # 2. SQL 注入防护
        sanitized = self._prevent_sql_injection(sanitized)

        # 3. 命令注入防护
        sanitized = self._prevent_command_injection(sanitized)

        # 4. 路径遍历防护
        sanitized = self._prevent_path_traversal(sanitized)

        return sanitized

    def _prevent_sql_injection(self, text: str) -> str:
        """防止 SQL 注入"""
        # 使用参数化查询,这里只是示例
        dangerous_patterns = [
            r"('\s*OR\s*')",
            r"(;\s*DROP\s+TABLE)",
            r"(UNION\s+SELECT)",
            r"(INSERT\s+INTO)",
            r"(UPDATE\s+.*\s+SET)"
        ]

        for pattern in dangerous_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                # 记录并清理
                self._log_security_event("sql_injection_attempt", text)
                text = re.sub(pattern, "", text, flags=re.IGNORECASE)

        return text

    def secure_output(self, output: str, context: dict) -> str:
        """安全化输出"""
        # 1. 敏感信息脱敏
        output = self._redact_sensitive_info(output)

        # 2. XSS 防护
        output = self._prevent_xss(output)

        # 3. 信息泄露防护
        output = self._prevent_info_disclosure(output)

        return output

    def _redact_sensitive_info(self, text: str) -> str:
        """脱敏敏感信息"""
        # API 密钥
        text = re.sub(r'(api[_-]?key[\s:=]+)[\w-]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)

        # 密码
        text = re.sub(r'(password[\s:=]+)\S+', r'\1[REDACTED]', text, flags=re.IGNORECASE)

        # Token
        text = re.sub(r'(token[\s:=]+)[\w-]+', r'\1[REDACTED]', text, flags=re.IGNORECASE)

        return text

4.6 数据合规#

GDPR 合规实现

class GDPRCompliance:
    """GDPR 合规管理"""

    def __init__(self):
        self.consent_manager = ConsentManager()
        self.data_processor = DataProcessor()
        self.audit_logger = AuditLogger()

    def process_personal_data(self, data: dict, user_id: str) -> dict:
        """处理个人数据"""
        # 1. 检查同意
        if not self.consent_manager.has_consent(user_id, "data_processing"):
            raise PermissionError("No consent for data processing")

        # 2. 数据最小化
        minimal_data = self._minimize_data(data)

        # 3. 假名化
        pseudonymized = self._pseudonymize(minimal_data, user_id)

        # 4. 记录处理活动
        self.audit_logger.log_processing_activity(
            user_id=user_id,
            purpose="service_provision",
            legal_basis="consent",
            data_categories=self._get_data_categories(minimal_data)
        )

        return pseudonymized

    def _minimize_data(self, data: dict) -> dict:
        """数据最小化"""
        # 只保留必要字段
        required_fields = ["name", "email", "query"]

        minimal = {}
        for field in required_fields:
            if field in data:
                minimal[field] = data[field]

        return minimal

    def _pseudonymize(self, data: dict, user_id: str) -> dict:
        """假名化处理"""
        # 生成假名
        pseudo_id = hashlib.sha256(f"{user_id}{SECRET_SALT}".encode()).hexdigest()[:16]

        # 替换标识符
        data["user_id"] = pseudo_id

        # 加密敏感字段
        if "email" in data:
            data["email_encrypted"] = self.encryption.encrypt(data["email"])
            del data["email"]

        return data

    def handle_data_request(self, request_type: str, user_id: str) -> dict:
        """处理数据请求(GDPR 权利)"""
        if request_type == "access":
            # 数据访问权
            return self._export_user_data(user_id)

        elif request_type == "portability":
            # 数据可携带权
            return self._export_portable_data(user_id)

        elif request_type == "erasure":
            # 被遗忘权
            return self._erase_user_data(user_id)

        elif request_type == "rectification":
            # 数据更正权
            return self._rectify_user_data(user_id)

        else:
            raise ValueError(f"Unknown request type: {request_type}")

4.7 审计日志#

完整审计系统

class AuditSystem:
    """审计系统"""

    def __init__(self):
        self.storage = AuditStorage()
        self.analyzer = AuditAnalyzer()

    def log_event(
        self,
        event_type: str,
        user_id: str,
        action: str,
        resource: str,
        result: str,
        metadata: Optional[dict] = None
    ):
        """记录审计事件"""
        event = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "user_id": user_id,
            "action": action,
            "resource": resource,
            "result": result,
            "metadata": metadata or {},
            "ip_address": self._get_client_ip(),
            "user_agent": self._get_user_agent(),
            "session_id": self._get_session_id()
        }

        # 存储
        self.storage.store(event)

        # 实时分析
        self.analyzer.analyze(event)

        return event["id"]

    def query_logs(
        self,
        filters: dict,
        start_time: datetime,
        end_time: datetime,
        limit: int = 100
    ) -> List[dict]:
        """查询审计日志"""
        return self.storage.query(filters, start_time, end_time, limit)

    def generate_compliance_report(self, period: str) -> dict:
        """生成合规报告"""
        report = {
            "period": period,
            "generated_at": datetime.utcnow().isoformat(),
            "statistics": self._calculate_statistics(period),
            "anomalies": self._detect_anomalies(period),
            "compliance_status": self._check_compliance(period)
        }

        return report

class AuditStorage:
    """审计存储"""

    def __init__(self):
        # 使用不可变存储
        self.immutable_store = ImmutableLogStore()

    def store(self, event: dict):
        """存储审计事件"""
        # 添加完整性校验
        event["hash"] = self._calculate_hash(event)

        # 签名
        event["signature"] = self._sign_event(event)

        # 存储到不可变存储
        self.immutable_store.append(event)

        # 异步复制到长期存储
        asyncio.create_task(self._replicate_to_cold_storage(event))

    def _calculate_hash(self, event: dict) -> str:
        """计算事件哈希"""
        # 确保顺序一致
        canonical = json.dumps(event, sort_keys=True)
        return hashlib.sha256(canonical.encode()).hexdigest()

    def _sign_event(self, event: dict) -> str:
        """数字签名"""
        # 使用私钥签名
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.asymmetric import padding

        message = json.dumps(event, sort_keys=True).encode()
        signature = PRIVATE_KEY.sign(
            message,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )

        return base64.b64encode(signature).decode()

第5章: 部署与运维#

关注点:掌握生产环境的部署策略和运维实践。

5.1 部署策略#

容器化部署

生产级Dockerfile

# Dockerfile
FROM python:3.11-slim

# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1

# 安全:非 root 用户
RUN useradd -m -u 1000 langchain && \
    mkdir -p /app && \
    chown -R langchain:langchain /app

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 依赖(分层缓存优化)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 应用代码
COPY --chown=langchain:langchain . .

# 切换用户
USER langchain

# 暴露端口
EXPOSE 8000

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
  CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health', timeout=2)"

# 启动(使用生产级配置)
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

完整docker-compose.yml

# docker-compose.yml
version: '3.8'

services:
  langchain-app:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      # 从.env文件或环境变量加载
      - LANGCHAIN_TRACING_V2=${LANGCHAIN_TRACING_V2:-false}
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - REDIS_HOST=redis
      - REDIS_PORT=6379
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    depends_on:
      - redis
      - prometheus
    networks:
      - langchain-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    restart: unless-stopped
    networks:
      - langchain-network
    command: redis-server --appendonly yes

  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./prometheus_alerts.yml:/etc/prometheus/prometheus_alerts.yml
      - prometheus-data:/prometheus
    restart: unless-stopped
    networks:
      - langchain-network
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD:-admin}
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - grafana-data:/var/lib/grafana
    restart: unless-stopped
    networks:
      - langchain-network
    depends_on:
      - prometheus

volumes:
  redis-data:
  prometheus-data:
  grafana-data:

networks:
  langchain-network:
    driver: bridge

requirements.txt示例

# requirements.txt
langchain>=1.0.7
langchain-openai>=1.0.3
langchain-core>=1.0.7
langchain-community>=1.0.7
langgraph>=1.0.3
langsmith>=0.4.43

# Web框架
fastapi==0.115.6
uvicorn[standard]==0.34.0

# 监控
prometheus-client==0.21.0

# 缓存
redis==5.2.1

# 工具
python-dotenv==1.0.1
pydantic==2.10.4
pydantic-settings==2.7.0

.env示例

# .env
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_langsmith_key
OPENAI_API_KEY=your_openai_key
GRAFANA_PASSWORD=your_grafana_password

.dockerignore

# .dockerignore
__pycache__/
*.py[cod]
*$py.class
*.so
.env
.venv/
venv/
*.log
.git/
.gitignore
.pytest_cache/
.coverage
htmlcov/
dist/
build/
*.egg-info/
.DS_Store

部署命令

# 构建镜像
docker-compose build

# 启动服务
docker-compose up -d

# 查看日志
docker-compose logs -f langchain-app

# 停止服务
docker-compose down

# 完全清理(包括数据卷)
docker-compose down -v

Kubernetes 部署配置

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: langchain-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langchain
  template:
    metadata:
      labels:
        app: langchain
    spec:
      containers:
      - name: app
        image: langchain-app:latest
        ports:
        - containerPort: 8080
        env:
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: langchain-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: langchain-service
spec:
  selector:
    app: langchain
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: langchain-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: langchain-app
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

Serverless 部署(AWS Lambda)

# handler.py
import json
from mangum import Mangum
from fastapi import FastAPI
from langchain.agents import create_agent

app = FastAPI()

# 初始化(冷启动优化)
agent = None

def get_agent():
    global agent
    if agent is None:
        agent = create_agent(
            model=ChatOpenAI(model="gpt-4o-mini"),
            tools=[],
            system_prompt="你是一个助手"
        )
    return agent

@app.post("/chat")
async def chat(request: dict):
    agent = get_agent()
    response = agent.invoke(request)
    return response

# Lambda handler
handler = Mangum(app)
# serverless.yml
service: langchain-service

provider:
  name: aws
  runtime: python3.11
  region: us-east-1
  timeout: 30
  memorySize: 1024
  environment:
    OPENAI_API_KEY: ${ssm:/langchain/openai_api_key}

functions:
  chat:
    handler: handler.handler
    events:
      - http:
          path: /chat
          method: post
          cors: true
    reservedConcurrency: 10
    provisionedConcurrency: 2  # 减少冷启动

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: true
    slim: true
    strip: false

5.2 监控告警#

5.2.1 Prometheus监控集成#

1. 安装依赖

pip install prometheus-client

2. 完整示例

from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from typing import Dict, Any
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# 定义Prometheus指标
llm_requests = Counter(
    'langchain_llm_requests_total',
    'Total LLM requests',
    ['model', 'status']
)

llm_latency = Histogram(
    'langchain_llm_latency_seconds',
    'LLM request latency',
    ['model'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)

llm_tokens = Counter(
    'langchain_llm_tokens_total',
    'Total tokens used',
    ['model', 'type']  # type: prompt/completion
)

llm_errors = Counter(
    'langchain_llm_errors_total',
    'Total LLM errors',
    ['model', 'error_type']
)

# 集成到LangChain回调
class PrometheusCallback(BaseCallbackHandler):
    """Prometheus监控回调"""

    def on_llm_start(self, serialized, prompts, **kwargs):
        """LLM开始时记录"""
        self.start_time = time.time()

    def on_llm_end(self, response, **kwargs):
        """LLM结束时记录指标"""
        duration = time.time() - self.start_time

        # 获取模型信息
        model = response.llm_output.get("model_name", "unknown") if response.llm_output else "unknown"

        # 记录成功指标
        llm_requests.labels(model=model, status="success").inc()
        llm_latency.labels(model=model).observe(duration)

        # 记录token使用
        if response.llm_output:
            token_usage = response.llm_output.get("token_usage", {})
            llm_tokens.labels(model=model, type="prompt").inc(
                token_usage.get("prompt_tokens", 0)
            )
            llm_tokens.labels(model=model, type="completion").inc(
                token_usage.get("completion_tokens", 0)
            )

    def on_llm_error(self, error, **kwargs):
        """LLM错误时记录"""
        error_type = type(error).__name__
        llm_requests.labels(model="unknown", status="error").inc()
        llm_errors.labels(model="unknown", error_type=error_type).inc()

# 启动Prometheus HTTP服务器(暴露指标)
start_http_server(8001)  # 在独立端口暴露指标(避免与应用端口8000冲突)

# 使用示例
chain = ChatOpenAI(model="gpt-4o-mini") | StrOutputParser()
result = chain.invoke(
    "你好,介绍一下LangChain",
    config={"callbacks": [PrometheusCallback()]}
)

print(f"访问 http://localhost:8001/metrics 查看指标")

3. Grafana可视化配置

{
  "dashboard": {
    "title": "LangChain监控面板",
    "panels": [
      {
        "title": "LLM请求速率(每秒)",
        "targets": [{
          "expr": "rate(langchain_llm_requests_total[5m])",
          "legendFormat": "{{model}} - {{status}}"
        }],
        "type": "graph"
      },
      {
        "title": "LLM延迟分布(P50/P95/P99)",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, rate(langchain_llm_latency_seconds_bucket[5m]))",
            "legendFormat": "P50 - {{model}}"
          },
          {
            "expr": "histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m]))",
            "legendFormat": "P95 - {{model}}"
          },
          {
            "expr": "histogram_quantile(0.99, rate(langchain_llm_latency_seconds_bucket[5m]))",
            "legendFormat": "P99 - {{model}}"
          }
        ],
        "type": "graph"
      },
      {
        "title": "Token使用量(每分钟)",
        "targets": [{
          "expr": "rate(langchain_llm_tokens_total[1m])",
          "legendFormat": "{{model}} - {{type}}"
        }],
        "type": "graph"
      },
      {
        "title": "错误率",
        "targets": [{
          "expr": "rate(langchain_llm_errors_total[5m])",
          "legendFormat": "{{model}} - {{error_type}}"
        }],
        "type": "graph"
      }
    ]
  }
}

4. Prometheus告警规则

# prometheus_alerts.yml
groups:
  - name: langchain_alerts
    rules:
      - alert: HighLLMLatency
        expr: histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m])) > 5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM延迟过高"
          description: "P95延迟超过5秒: {{ $value }}s"

      - alert: CriticalLLMLatency
        expr: histogram_quantile(0.95, rate(langchain_llm_latency_seconds_bucket[5m])) > 10
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "LLM延迟严重过高"
          description: "P95延迟超过10秒: {{ $value }}s"

      - alert: HighErrorRate
        expr: rate(langchain_llm_requests_total{status="error"}[5m]) / rate(langchain_llm_requests_total[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "LLM错误率过高"
          description: "错误率超过10%: {{ $value | humanizePercentage }}"

      - alert: TokenBudgetExceeded
        expr: increase(langchain_llm_tokens_total[1h]) > 1000000
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Token预算超限"
          description: "过去1小时使用了{{ $value }}个tokens"

      - alert: NoRecentRequests
        expr: rate(langchain_llm_requests_total[10m]) == 0
        for: 15m
        labels:
          severity: info
        annotations:
          summary: "无LLM请求"
          description: "过去15分钟无LLM请求,可能服务异常"

5. Prometheus配置文件

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

rule_files:
  - "prometheus_alerts.yml"

scrape_configs:
  - job_name: 'langchain_app'
    static_configs:
      - targets: ['localhost:8001']  # LangChain应用的metrics端口

5.2.2 监控系统实现#

监控中间件

from prometheus_client import Counter, Histogram, Gauge
import logging

# Prometheus 指标
request_count = Counter('langchain_requests_total', 'Total requests', ['endpoint', 'status'])
request_duration = Histogram('langchain_request_duration_seconds', 'Request duration', ['endpoint'])
active_requests = Gauge('langchain_active_requests', 'Active requests')
model_tokens = Counter('langchain_model_tokens_total', 'Total tokens used', ['model'])
error_count = Counter('langchain_errors_total', 'Total errors', ['error_type'])

class MonitoringMiddleware:
    """监控中间件"""

    async def __call__(self, request, call_next):
        endpoint = request.url.path

        # 记录活跃请求
        active_requests.inc()

        # 记录请求时间
        with request_duration.labels(endpoint=endpoint).time():
            try:
                response = await call_next(request)

                # 记录请求数
                request_count.labels(
                    endpoint=endpoint,
                    status=response.status_code
                ).inc()

                return response

            except Exception as e:
                # 记录错误
                error_count.labels(error_type=type(e).__name__).inc()
                raise

            finally:
                active_requests.dec()

# 告警规则(Prometheus AlertManager)
ALERT_RULES = """
groups:
- name: langchain_alerts
  interval: 30s
  rules:

  - alert: HighErrorRate
    expr: rate(langchain_errors_total[5m]) > 0.05
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: High error rate detected
      description: "Error rate is {{ $value }} errors per second"

  - alert: HighLatency
    expr: histogram_quantile(0.95, langchain_request_duration_seconds) > 5
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: High latency detected
      description: "95th percentile latency is {{ $value }} seconds"

  - alert: TokenBudgetExceeded
    expr: increase(langchain_model_tokens_total[1h]) > 1000000
    labels:
      severity: warning
    annotations:
      summary: Token budget exceeded
      description: "Used {{ $value }} tokens in the last hour"
"""

5.3 故障处理#

故障恢复系统

class FaultToleranceSystem:
    """故障容错系统"""

    def __init__(self):
        self.health_checker = HealthChecker()
        self.circuit_breaker = CircuitBreaker()
        self.fallback_handler = FallbackHandler()

    async def handle_request(self, request: dict) -> dict:
        """处理请求(带故障容错)"""

        # 健康检查
        if not await self.health_checker.is_healthy():
            return await self.fallback_handler.handle(request)

        # 熔断器
        if self.circuit_breaker.is_open():
            return await self.fallback_handler.handle(request)

        try:
            # 正常处理
            response = await self.process_request(request)
            self.circuit_breaker.record_success()
            return response

        except Exception as e:
            self.circuit_breaker.record_failure()

            # 故障分类处理
            if isinstance(e, RateLimitError):
                # 速率限制:等待后重试
                await asyncio.sleep(e.retry_after)
                return await self.handle_request(request)

            elif isinstance(e, ModelError):
                # 模型错误:切换备用模型
                return await self.fallback_to_backup_model(request)

            elif isinstance(e, TimeoutError):
                # 超时:返回缓存或默认响应
                cached = await self.get_cached_response(request)
                if cached:
                    return cached

                return self.get_default_response(request)

            else:
                # 未知错误:记录并返回错误响应
                await self.log_error(e)
                return {
                    "status": "error",
                    "message": "服务暂时不可用,请稍后重试"
                }

class DisasterRecovery:
    """灾难恢复"""

    def __init__(self):
        self.backup_regions = ["us-west-2", "eu-west-1", "ap-northeast-1"]
        self.primary_region = "us-east-1"

    async def check_region_health(self, region: str) -> bool:
        """检查区域健康状态"""
        try:
            response = await self.ping_region(region)
            return response.status_code == 200
        except:
            return False

    async def failover(self):
        """故障转移"""
        # 检查主区域
        if await self.check_region_health(self.primary_region):
            return self.primary_region

        # 主区域故障,切换到备用区域
        for region in self.backup_regions:
            if await self.check_region_health(region):
                await self.switch_traffic(region)
                await self.notify_ops_team(f"Failover to {region}")
                return region

        # 所有区域都故障
        raise CriticalError("All regions are down")


5.4 生产部署检查清单#

5.4.1 安全检查清单#

API密钥与凭证管理

  • 环境变量管理:所有API密钥通过环境变量注入,禁止硬编码
  • 密钥管理服务:使用AWS Secrets Manager、HashiCorp Vault等密钥管理服务
  • 密钥轮换:定期轮换API密钥和访问令牌
  • 最小权限原则:为不同环境配置不同权限级别的密钥

网络安全

  • 启用HTTPS/TLS:所有外部通信强制使用HTTPS
  • TLS版本:使用TLS 1.2或更高版本
  • 证书验证:验证SSL证书有效性
  • API网关:通过API网关统一管理和保护API端点

速率限制与防护

  • 全局速率限制:限制每个用户/IP的请求频率
  • 端点级限制:为不同端点设置不同速率限制
  • DDoS防护:配置CloudFlare或AWS Shield等防护服务
  • 请求大小限制:限制请求payload大小

输入验证与清洗

  • 输入长度验证:限制提示词和输入数据长度
  • SQL注入防护:使用参数化查询,禁止拼接SQL
  • XSS防护:对所有用户输入进行HTML实体编码
  • 命令注入防护:禁止直接执行用户输入的命令
  • 路径遍历防护:验证和清洗文件路径

敏感信息脱敏

  • PII检测:自动检测和脱敏个人身份信息
  • 输出过滤:过滤API密钥、密码、Token等敏感信息
  • 日志脱敏:确保日志中不包含敏感信息
  • 错误信息处理:避免泄露系统内部信息

5.4.2 性能检查清单#

连接池配置

  • HTTP连接池:配置合理的连接池大小(建议:每个主机10-30个连接)
  • 数据库连接池:配置数据库连接池(建议:最小5个,最大20个)
  • Redis连接池:配置Redis连接池参数
  • 连接超时:设置合理的连接超时时间(建议:10秒)
  • 读取超时:设置读取超时时间(建议:30秒)

缓存策略

  • LLM响应缓存:启用语义缓存或精确匹配缓存
  • 缓存层级:配置L1(内存)、L2(Redis)、L3(S3)多层缓存
  • TTL配置:为不同数据类型设置合理的过期时间
  • 缓存预热:在启动时预加载热点数据
  • 缓存失效:实现缓存失效和更新策略

超时控制

  • LLM调用超时:设置LLM API调用超时(建议:30-60秒)
  • 工具执行超时:为每个工具设置独立超时时间
  • 总体超时:设置请求总超时时间
  • 流式响应超时:配置流式响应的超时策略

批处理优化

  • 批量推理:支持批量处理多个请求
  • 批次大小:根据模型和硬件优化批次大小
  • 异步处理:使用异步I/O提升并发能力
  • 并行执行:支持并行调用多个工具或LLM

5.4.3 可靠性检查清单#

重试策略

  • 指数退避:实现指数退避重试策略
  • 重试次数:设置合理的最大重试次数(建议:3次)
  • 重试条件:仅对可恢复的错误(429、500、503)进行重试
  • 幂等性:确保重试操作是幂等的
  • 超时重试:对超时请求进行重试

熔断器

  • 熔断阈值:设置失败率阈值(建议:50%)
  • 恢复时间:配置熔断器自动恢复时间(建议:60秒)
  • 半开状态:实现半开状态探测服务恢复
  • 熔断告警:熔断器打开时发送告警

降级方案

  • 备用模型:配置备用LLM模型
  • 缓存降级:服务不可用时返回缓存结果
  • 默认响应:提供友好的默认响应
  • 功能降级:关键路径失败时降级到简化功能

健康检查

  • 就绪检查:实现/ready端点检查依赖服务
  • 存活检查:实现/health端点检查应用状态
  • 深度检查:可选的深度健康检查(验证LLM连接等)
  • 检查间隔:配置合理的健康检查间隔(建议:30秒)

5.4.4 监控检查清单#

LangSmith追踪

  • 启用追踪:设置LANGCHAIN_TRACING_V2=true
  • 项目配置:为不同环境配置不同项目
  • 采样率:生产环境配置合理的采样率(建议:10%)
  • 敏感信息过滤:过滤追踪数据中的敏感信息

Prometheus指标

  • 请求指标:记录请求总数、成功率、失败率
  • 延迟指标:记录P50、P95、P99延迟
  • Token指标:记录输入/输出token使用量
  • 错误指标:记录不同类型的错误数量
  • 自定义指标:根据业务需求添加自定义指标

日志聚合

  • 结构化日志:使用JSON格式记录结构化日志
  • 日志级别:生产环境使用INFO级别,开发环境使用DEBUG
  • 日志聚合:配置ELK、Loki或CloudWatch日志聚合
  • 日志保留:设置日志保留策略(建议:30-90天)
  • 请求ID:为每个请求生成唯一ID用于追踪

告警规则

  • 延迟告警:P95延迟超过阈值时告警
  • 错误率告警:错误率超过阈值时告警
  • 成本告警:Token使用量超过预算时告警
  • 可用性告警:服务不可用时立即告警
  • 告警通道:配置PagerDuty、Slack、邮件等告警通道

5.4.5 成本检查清单#

Token使用监控

  • 实时监控:实时监控token使用量
  • 用户级统计:按用户统计token使用量
  • 模型级统计:按模型统计成本
  • 成本分析:定期分析成本构成

成本预算控制

  • 预算告警:设置每日/每月预算告警
  • 用户配额:为每个用户设置token配额
  • 速率限制:通过速率限制控制成本
  • 自动熔断:超出预算时自动停止服务

缓存命中率优化

  • 缓存命中率监控:监控缓存命中率(目标:>80%)
  • 缓存预热:预加载常用查询
  • 语义缓存:使用语义相似度缓存提升命中率
  • 缓存分析:定期分析缓存效果

模型选择策略

  • 智能路由:根据任务复杂度选择合适模型
  • 成本优化:优先使用便宜模型(如gpt-4o-mini)
  • 质量平衡:在成本和质量间找到平衡点
  • A/B测试:对比不同模型的成本效益

5.4.6 数据合规检查清单#

GDPR合规

  • 用户同意:收集用户数据处理同意
  • 数据最小化:仅收集必要的个人数据
  • 访问权:实现用户数据访问接口
  • 删除权:实现用户数据删除功能(被遗忘权)
  • 可携带权:支持导出用户数据
  • 数据假名化:对个人数据进行假名化处理

数据加密

  • 传输加密:使用TLS加密所有网络传输
  • 存储加密:加密存储敏感数据
  • 密钥管理:使用KMS管理加密密钥
  • 加密算法:使用AES-256等强加密算法

审计日志

  • 完整记录:记录所有数据访问和修改操作
  • 不可篡改:使用只追加存储或区块链技术
  • 日志签名:对审计日志进行数字签名
  • 定期审计:定期审查审计日志
  • 合规报告:生成合规审计报告

5.4.7 部署前最终检查#

代码检查

  • 单元测试:单元测试覆盖率>80%
  • 集成测试:所有关键路径有集成测试
  • 性能测试:通过负载测试和压力测试
  • 安全扫描:通过安全漏洞扫描

配置检查

  • 环境变量:所有环境变量正确配置
  • 资源限制:配置CPU、内存限制
  • 副本数:配置合理的副本数量(建议:>=2)
  • 自动扩缩容:配置HPA自动扩缩容策略

文档检查

  • 部署文档:完整的部署操作文档
  • 运维手册:故障排查和应急响应手册
  • API文档:完整的API接口文档
  • 变更记录:记录所有重要变更

演练检查

  • 灰度发布:先在小范围用户测试
  • 回滚计划:准备快速回滚方案
  • 故障演练:进行故障注入和恢复演练
  • 应急响应:明确应急响应流程和人员

本章小结#

  1. 架构设计:RAG、Multi-Agent、Workflow 三种核心架构模式
  2. 性能优化:延迟优化、吞吐量提升、Token 成本控制、多层缓存
  3. 安全防护:Guardrails 体系、PII 保护、分层防护
  4. 合规要求:输入输出安全、GDPR 合规、审计日志
  5. 部署运维:容器化、Serverless、监控告警、故障恢复
  6. 生产清单:全面的安全、性能、可靠性、监控、成本、合规检查清单

第6章:测试与质量保障#

关注点: 构建Agent的完整测试体系

6.1 测试金字塔#

Agent测试不同于传统软件测试,需要处理LLM的不确定性:

         /\
        /  \     E2E评估(少量、慢、贵)
       /____\    - LangSmith Evaluation
      /      \   - 真实场景回归
     /        \
    /          \ 集成测试(中等)
   /____________\  - Agent工作流测试
  /              \ - 工具调用链路测试
 /                \
/__________________\ 单元测试(大量、快、便宜)
                     - Tool函数测试
                     - Prompt模板测试
                     - 输出解析器测试

原则:70%单元测试 + 20%集成测试 + 10%E2E评估


6.2 单元测试实践#

6.2.1 Tool函数测试#

# test_tools.py
import pytest
from langchain_core.tools import tool

@tool
def extract_order_id(text: str) -> str:
    """从文本中提取订单号"""
    import re
    match = re.search(r'订单号[::]\s*(\w+)', text)
    return match.group(1) if match else "未找到订单号"

class TestTools:
    """Tool函数单元测试"""

    @pytest.mark.parametrize("text,expected", [
        ("您的订单号:ABC123已确认", "ABC123"),
        ("订单号:XYZ789", "XYZ789"),
        ("这是一段没有订单号的文本", "未找到订单号"),
    ])
    def test_extract_order_id(self, text, expected):
        """参数化测试多种格式"""
        result = extract_order_id.invoke({"text": text})
        assert result == expected

    def test_tool_metadata(self):
        """测试Tool元数据"""
        assert extract_order_id.name == "extract_order_id"
        assert "订单号" in extract_order_id.description

运行

pytest test_tools.py -v
# ✅ test_extract_order_id[...] PASSED
# ✅ test_tool_metadata PASSED

6.2.2 Agent工作流集成测试#

# test_agent_workflow.py
import pytest
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from unittest.mock import Mock, patch

class TestAgentWorkflow:
    """Agent集成测试"""

    @pytest.fixture
    def agent(self):
        """创建测试用Agent"""
        return create_agent(
            ChatOpenAI(model="gpt-4o-mini", temperature=0),
            tools=[search_tool, calculator_tool],
            system_prompt="你是助手"
        )

    def test_tool_selection(self, agent):
        """测试工具选择正确性"""
        result = agent.invoke({
            "messages": [("user", "搜索Python")]
        })

        messages = result["messages"]
        tool_calls = [m for m in messages if hasattr(m, 'name')]

        # 验证调用了search_tool
        assert any(m.name == "search_tool" for m in tool_calls)

    @patch('langchain_openai.ChatOpenAI')
    def test_with_mocked_llm(self, mock_llm_class):
        """Mock LLM避免真实API调用"""
        mock_llm = Mock()
        mock_llm_class.return_value = mock_llm

        from langchain_core.messages import AIMessage
        mock_llm.invoke.return_value = AIMessage(content="模拟回复")

        agent = create_agent(mock_llm, tools=[], system_prompt="测试")
        result = agent.invoke({"messages": [("user", "测试")]})

        assert "模拟回复" in result["messages"][-1].content
        assert mock_llm.invoke.called

6.3 LangSmith自动化评估#

6.3.1 创建评估Dataset#

from langsmith import Client

client = Client()

# 创建Dataset
dataset = client.create_dataset(
    dataset_name="customer_support_qa",
    description="客服问答评估数据集"
)

# 添加测试用例
test_cases = [
    {
        "inputs": {"question": "如何退款?"},
        "outputs": {
            "expected_keywords": ["订单详情", "申请退款"],
            "relevance_score": 5
        }
    },
    # ... 更多用例
]

for case in test_cases:
    client.create_example(
        dataset_id=dataset.id,
        inputs=case["inputs"],
        outputs=case["outputs"]
    )

6.3.2 定义Evaluators#

from langsmith.evaluation import evaluator

@evaluator
def keyword_coverage_evaluator(run, example):
    """关键词覆盖率评估"""
    answer = run.outputs.get("answer", "")
    expected_keywords = example.outputs.get("expected_keywords", [])

    found = [kw for kw in expected_keywords if kw in answer]
    score = len(found) / len(expected_keywords) if expected_keywords else 0

    return {
        "key": "keyword_coverage",
        "score": score,
        "comment": f"覆盖 {len(found)}/{len(expected_keywords)} 个关键词"
    }

# LLM-as-Judge评估器
from langsmith.evaluation import LangChainStringEvaluator

llm_evaluator = LangChainStringEvaluator(
    "cot_qa",
    prepare_data=lambda run, example: {
        "prediction": run.outputs.get("answer", ""),
        "reference": example.outputs.get("expected_answer", ""),
        "input": example.inputs.get("question", "")
    }
)

6.3.3 运行评估与A/B测试#

from langsmith.evaluation import evaluate

# 版本A
def predict_v_a(inputs):
    agent = create_agent(
        ChatOpenAI(model="gpt-4o"),
        tools=[],
        system_prompt="你是客服助手"
    )
    result = agent.invoke({"messages": [("user", inputs["question"])]})
    return {"answer": result["messages"][-1].content}

# 版本B(改进的Prompt)
def predict_v_b(inputs):
    agent = create_agent(
        ChatOpenAI(model="gpt-4o"),
        tools=[],
        system_prompt="""你是专业的客服助手。

回答要求:
1. 准确理解用户问题
2. 提供清晰的步骤说明
3. 使用友好的语气"""
    )
    result = agent.invoke({"messages": [("user", inputs["question"])]})
    return {"answer": result["messages"][-1].content}

# A/B测试
results_a = evaluate(
    predict_v_a,
    data="customer_support_qa",
    evaluators=[keyword_coverage_evaluator, llm_evaluator],
    experiment_prefix="Version_A"
)

results_b = evaluate(
    predict_v_b,
    data="customer_support_qa",
    evaluators=[keyword_coverage_evaluator, llm_evaluator],
    experiment_prefix="Version_B"
)

# 对比
print(f"版本A关键词覆盖率: {results_a['keyword_coverage_avg']:.2%}")
print(f"版本B关键词覆盖率: {results_b['keyword_coverage_avg']:.2%}")

improvement = ((results_b['keyword_coverage_avg'] - results_a['keyword_coverage_avg']) /
               results_a['keyword_coverage_avg'] * 100)
print(f"改进幅度: {improvement:+.1f}%")

# 统计显著性检验
from scipy import stats
scores_a = [r.evaluation_results["keyword_coverage"] for r in results_a.results]
scores_b = [r.evaluation_results["keyword_coverage"] for r in results_b.results]

t_stat, p_value = stats.ttest_ind(scores_a, scores_b)
print(f"显著性: {'✅ 显著' if p_value < 0.05 else '⚠️ 不显著'} (p={p_value:.4f})")

6.4 CI/CD集成#

6.4.1 GitHub Actions配置#

# .github/workflows/test.yml
name: Test Suite

on:
  push:
    branches: [main, dev]
  pull_request:
    branches: [main]

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov

      - name: Run Unit Tests
        run: |
          pytest tests/unit -v --cov=src --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3
        with:
          file: ./coverage.xml

  integration-tests:
    runs-on: ubuntu-latest
    needs: unit-tests
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run Integration Tests
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          pytest tests/integration -v

  langsmith-evaluation:
    runs-on: ubuntu-latest
    needs: integration-tests
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Run LangSmith Evaluation
        env:
          LANGSMITH_API_KEY: ${{ secrets.LANGSMITH_API_KEY }}
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python scripts/run_evaluation.py

      - name: Quality Gate
        run: |
          python scripts/quality_gate.py

6.4.2 质量门禁#

# scripts/quality_gate.py
from langsmith.evaluation import evaluate
import sys

def run_quality_gate():
    """质量门禁检查"""
    print("🚦 运行质量门禁检查...\n")

    results = evaluate(
        predict,
        data="customer_support_qa",
        evaluators=[keyword_coverage_evaluator]
    )

    # 质量标准
    quality_standards = {
        "keyword_coverage_avg": 0.80,  # >= 80%
        "pass_rate": 0.90  # >= 90%
    }

    all_passed = True
    for metric, threshold in quality_standards.items():
        actual = results.get(metric, 0)
        passed = actual >= threshold
        status = "✅ PASS" if passed else "❌ FAIL"
        print(f"{metric}: {actual:.2%} (阈值: {threshold:.2%}) - {status}")
        if not passed:
            all_passed = False

    if all_passed:
        print("\n✅ 质量门禁通过,允许部署")
        return 0
    else:
        print("\n❌ 质量门禁失败,禁止部署")
        return 1

if __name__ == "__main__":
    sys.exit(run_quality_gate())

6.5 测试最佳实践#

测试覆盖率目标

测试类型比例速度成本何时运行
单元测试70%快(秒级)每次commit
集成测试20%中(分钟级)每次PR
LangSmith评估10%慢(小时级)发布前

关键原则

  1. ✅ 单元测试覆盖所有Tool函数
  2. ✅ 集成测试覆盖关键工作流
  3. ✅ Mock外部依赖(LLM、API)降低成本
  4. ✅ LangSmith评估用于质量对比
  5. ✅ 质量门禁确保生产标准
  6. ✅ CI/CD自动化运行

第7章:错误处理与降级策略#

关注点: 生产环境的容错与高可用

7.1 错误处理层次#

用户请求
┌─────────────────────────────┐
│ 1. 输入验证                  │ ← 防止无效输入
└─────────┬───────────────────┘
┌─────────────────────────────┐
│ 2. 重试机制                  │ ← 处理临时故障
└─────────┬───────────────────┘
┌─────────────────────────────┐
│ 3. 降级策略                  │ ← 保证基本服务
└─────────┬───────────────────┘
┌─────────────────────────────┐
│ 4. 兜底响应                  │ ← 优雅失败
└─────────┬───────────────────┘
    返回结果

7.2 输入验证#

from pydantic import BaseModel, Field, validator
from typing import Optional

class AgentRequest(BaseModel):
    """Agent请求验证"""
    query: str = Field(..., min_length=1, max_length=1000)
    user_id: str = Field(..., pattern=r'^[a-zA-Z0-9_-]+$')
    context: Optional[dict] = None

    @validator('query')
    def validate_query(cls, v):
        """验证查询内容"""
        # 检测恶意输入
        malicious_patterns = ['<script>', 'DROP TABLE', 'rm -rf']
        if any(pattern in v for pattern in malicious_patterns):
            raise ValueError("检测到恶意输入")

        # 检测敏感信息
        import re
        if re.search(r'\d{15,19}', v):  # 信用卡号模式
            raise ValueError("请勿输入敏感信息")

        return v

# 使用
try:
    request = AgentRequest(
        query="帮我查询订单",
        user_id="user_123"
    )
    result = agent.invoke({"messages": [("user", request.query)]})
except ValueError as e:
    return {"error": str(e), "code": "INVALID_INPUT"}

7.3 智能重试机制#

from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from langchain_openai import ChatOpenAI
import logging

logger = logging.getLogger(__name__)

class RetryableAgent:
    """带重试的Agent"""

    def __init__(self):
        self.agent = create_agent(
            ChatOpenAI(model="gpt-4o"),
            tools=[],
            system_prompt="你是助手"
        )

    @retry(
        stop=stop_after_attempt(3),  # 最多重试3次
        wait=wait_exponential(multiplier=1, min=1, max=10),  # 指数退避
        retry=retry_if_exception_type((TimeoutError, ConnectionError)),
        before_sleep=lambda retry_state: logger.warning(
            f"重试 {retry_state.attempt_number}/3: {retry_state.outcome.exception()}"
        )
    )
    def invoke_with_retry(self, inputs: dict) -> dict:
        """带重试的调用"""
        try:
            return self.agent.invoke(inputs)
        except Exception as e:
            logger.error(f"Agent调用失败: {e}")
            raise

# 使用
agent = RetryableAgent()
try:
    result = agent.invoke_with_retry({"messages": [("user", "测试")]})
except Exception:
    # 重试3次后仍失败
    result = {"error": "服务暂时不可用,请稍后重试"}

7.4 多级降级策略#

from enum import Enum
from typing import Dict, Any

class FallbackLevel(Enum):
    """降级级别"""
    PRIMARY = 1      # GPT-4o(最优)
    SECONDARY = 2    # GPT-4o-mini(次优)
    TERTIARY = 3     # GPT-3.5-turbo(保底)
    CACHE = 4        # 缓存结果
    STATIC = 5       # 静态回复

class FallbackAgent:
    """多级降级Agent"""

    def __init__(self):
        self.cache = {}  # 简单缓存
        self.fallback_responses = {
            "default": "抱歉,服务暂时不可用,请稍后重试。",
            "timeout": "请求超时,请稍后重试。",
            "rate_limit": "请求过于频繁,请稍后重试。"
        }

    def invoke(self, inputs: dict) -> Dict[str, Any]:
        """多级降级调用"""
        query = inputs["messages"][0][1]

        # 级别1:GPT-4o
        try:
            return self._invoke_primary(inputs)
        except Exception as e:
            logger.warning(f"Primary失败: {e}")

        # 级别2:GPT-4o-mini
        try:
            return self._invoke_secondary(inputs)
        except Exception as e:
            logger.warning(f"Secondary失败: {e}")

        # 级别3:GPT-3.5-turbo
        try:
            return self._invoke_tertiary(inputs)
        except Exception as e:
            logger.warning(f"Tertiary失败: {e}")

        # 级别4:缓存
        cached = self.cache.get(query)
        if cached:
            logger.info("使用缓存结果")
            return {"messages": [cached], "fallback_level": FallbackLevel.CACHE}

        # 级别5:静态回复
        logger.error("所有降级策略失败,使用静态回复")
        return {
            "messages": [("assistant", self.fallback_responses["default"])],
            "fallback_level": FallbackLevel.STATIC
        }

    def _invoke_primary(self, inputs: dict) -> dict:
        """主要模型"""
        agent = create_agent(ChatOpenAI(model="gpt-4o", timeout=5), tools=[], system_prompt="你是助手")
        result = agent.invoke(inputs)
        return {**result, "fallback_level": FallbackLevel.PRIMARY}

    def _invoke_secondary(self, inputs: dict) -> dict:
        """次优模型"""
        agent = create_agent(ChatOpenAI(model="gpt-4o-mini", timeout=3), tools=[], system_prompt="你是助手")
        result = agent.invoke(inputs)
        return {**result, "fallback_level": FallbackLevel.SECONDARY}

    def _invoke_tertiary(self, inputs: dict) -> dict:
        """保底模型"""
        agent = create_agent(ChatOpenAI(model="gpt-3.5-turbo", timeout=2), tools=[], system_prompt="你是助手")
        result = agent.invoke(inputs)
        return {**result, "fallback_level": FallbackLevel.TERTIARY}

使用示例

agent = FallbackAgent()
result = agent.invoke({"messages": [("user", "你好")]})

print(f"响应: {result['messages'][-1].content}")
print(f"降级级别: {result['fallback_level'].name}")

# 监控降级率
from collections import Counter
fallback_counter = Counter()

for _ in range(100):
    result = agent.invoke({"messages": [("user", "测试")]})
    fallback_counter[result['fallback_level'].name] += 1

print("\n降级统计:")
for level, count in fallback_counter.items():
    print(f"  {level}: {count}次 ({count/100:.1%})")

# 输出:
# PRIMARY: 85次 (85.0%)
# SECONDARY: 10次 (10.0%)
# TERTIARY: 3次 (3.0%)
# CACHE: 2次 (2.0%)

7.5 熔断器模式#

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    """熔断器状态"""
    CLOSED = "closed"      # 正常
    OPEN = "open"          # 熔断(拒绝请求)
    HALF_OPEN = "half_open"  # 半开(尝试恢复)

class CircuitBreaker:
    """熔断器"""

    def __init__(
        self,
        failure_threshold: int = 5,  # 失败阈值
        timeout: int = 60,  # 熔断超时(秒)
        half_open_max_calls: int = 3  # 半开状态最大尝试次数
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.half_open_calls = 0

    def call(self, func, *args, **kwargs):
        """执行调用"""
        if self.state == CircuitState.OPEN:
            # 检查是否可以转为半开状态
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                logger.info("熔断器转为半开状态")
            else:
                raise Exception("熔断器开启,拒绝请求")

        try:
            result = func(*args, **kwargs)

            # 成功
            if self.state == CircuitState.HALF_OPEN:
                self.half_open_calls += 1
                if self.half_open_calls >= self.half_open_max_calls:
                    self.state = CircuitState.CLOSED
                    self.failure_count = 0
                    logger.info("熔断器恢复正常")

            return result

        except Exception as e:
            # 失败
            self.failure_count += 1
            self.last_failure_time = datetime.now()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                logger.error(f"熔断器开启(失败{self.failure_count}次)")

            raise

# 使用
circuit_breaker = CircuitBreaker(failure_threshold=3, timeout=60)

def invoke_agent(inputs):
    """调用Agent(可能失败)"""
    return circuit_breaker.call(
        lambda: agent.invoke(inputs)
    )

# 测试
for i in range(10):
    try:
        result = invoke_agent({"messages": [("user", "测试")]})
        print(f"请求{i}: ✅ 成功")
    except Exception as e:
        print(f"请求{i}: ❌ {e}")

7.6 监控与告警#

from dataclasses import dataclass
from datetime import datetime
from typing import List
import time

@dataclass
class ErrorMetrics:
    """错误指标"""
    timestamp: datetime
    error_type: str
    error_message: str
    fallback_level: Optional[str] = None

class ErrorMonitor:
    """错误监控"""

    def __init__(self):
        self.errors: List[ErrorMetrics] = []

    def record_error(
        self,
        error_type: str,
        error_message: str,
        fallback_level: Optional[str] = None
    ):
        """记录错误"""
        self.errors.append(ErrorMetrics(
            timestamp=datetime.now(),
            error_type=error_type,
            error_message=error_message,
            fallback_level=fallback_level
        ))

    def get_error_rate(self, window_minutes: int = 5) -> float:
        """获取错误率"""
        cutoff_time = datetime.now() - timedelta(minutes=window_minutes)
        recent_errors = [
            e for e in self.errors
            if e.timestamp > cutoff_time
        ]
        # 假设总请求数
        total_requests = 100
        return len(recent_errors) / total_requests if total_requests else 0

    def check_alerts(self) -> List[str]:
        """检查告警"""
        alerts = []

        # 错误率告警
        error_rate = self.get_error_rate(window_minutes=5)
        if error_rate > 0.1:  # 10%
            alerts.append(f"⚠️  错误率过高: {error_rate:.1%}")

        # 降级告警
        recent_fallbacks = [
            e for e in self.errors[-100:]
            if e.fallback_level in ["TERTIARY", "CACHE", "STATIC"]
        ]
        if len(recent_fallbacks) > 20:
            alerts.append(f"⚠️  频繁降级: {len(recent_fallbacks)}次")

        return alerts

7.7 错误处理最佳实践#

生产环境检查清单

  • 输入验证

    • 长度限制
    • 格式验证
    • 恶意输入检测
    • 敏感信息过滤
  • 重试机制

    • 指数退避
    • 最大重试次数
    • 可重试异常类型
  • 降级策略

    • 多级模型降级
    • 缓存降级
    • 静态响应兜底
  • 熔断保护

    • 失败阈值配置
    • 熔断超时配置
    • 半开状态尝试
  • 监控告警

    • 错误率监控
    • 降级率监控
    • 告警规则配置

关键指标

指标目标值告警阈值
错误率< 1%> 5%
降级率< 5%> 20%
重试成功率> 80%< 50%
熔断触发次数0> 3次/小时

[统计组件仅在生产环境显示]