第 8 章:追踪与调试 -- 出了问题怎么查

系列教程:OpenAI Agents SDK 从入门到实战

本章目标:掌握 Tracing、Hooks、Usage 三板斧,让 Agent 的每一步操作都透明可查。


为什么需要追踪?

Agent 不是普通函数调用。一次 Runner.run() 背后可能发生了这些事:

  1. LLM 被调了 3 次(你以为只调了 1 次)
  2. 工具 A 执行了 2 次(其中一次超时了)
  3. 发生了一次 Handoff(从 Agent A 转给了 Agent B)
  4. 总共烧了 5000 个 token(你的钱包在哭泣)

问题是:如果你不主动追踪,上面这些信息你一个都看不到。出了 bug 就是抓瞎。

Agent SDK 提供了三个层面的观测能力:

层面 工具 用途
宏观追踪 trace() 把一组操作打包成一个 trace,在 OpenAI dashboard 上查看
微观钩子 RunHooks / AgentHooks 在每个关键节点插入自定义逻辑(打日志、记指标)
成本统计 Usage 统计 token 用量,知道花了多少钱

下面逐个击破。


trace():给操作打包追踪

自动追踪

好消息:SDK 默认会自动追踪每次 Runner.run() 的执行。你不需要写任何代码,trace 数据就会被发送到 OpenAI 后端(如果你配了 API key 的话)。

手动打包多个操作

有时候你的工作流里有多次 Runner.run() 调用,你希望它们归到同一个 trace 下面。这时候就用 trace() 上下文管理器:

import asyncio
import os
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, trace

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)

# 定义两个 Agent
summarizer = Agent(
    name="摘要员",
    instructions="用一句话总结用户的输入。",
    model=model,
)

translator = Agent(
    name="翻译员",
    instructions="把输入翻译成英文。",
    model=model,
)


async def main():
    # 用 trace() 把两次 run 打包到同一个追踪里
    with trace("摘要翻译流水线"):
        # 第一步:摘要
        result1 = await Runner.run(summarizer, "Python是一门非常流行的编程语言,广泛应用于数据科学、Web开发和人工智能领域。")
        print(f"摘要: {result1.final_output}")

        # 第二步:翻译(把摘要结果传给翻译员)
        result2 = await Runner.run(translator, result1.final_output)
        print(f"翻译: {result2.final_output}")


if __name__ == "__main__":
    asyncio.run(main())

在 OpenAI dashboard 上,这两次调用会出现在同一个名为"摘要翻译流水线"的 trace 下面,一目了然。

trace() 的参数

with trace(
    workflow_name="客服系统",      # trace 名称,必填
    trace_id=None,                 # 自定义 trace ID,不填会自动生成
    group_id="chat_thread_123",    # 分组 ID,比如同一个会话的多次交互
    metadata={"user_id": "u456"}, # 自定义元数据,方便过滤和分析
    disabled=False,                # 设为 True 可以关掉这个 trace
):
    ...

group_id 特别有用 -- 同一个用户的多轮对话,用同一个 group_id 串起来,回头查问题时直接按 group 过滤。


custom_span():追踪自定义操作

SDK 自动追踪 LLM 调用和工具执行,但你可能还有一些自己的业务逻辑想追踪(比如数据库查询、外部 API 调用)。这时候用 custom_span()

import asyncio
import os
import time
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, trace
from agents.tracing import custom_span

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)

agent = Agent(
    name="分析师",
    instructions="分析用户提供的数据,给出简要结论。",
    model=model,
)


async def fetch_data_from_db() -> str:
    """模拟数据库查询"""
    # 用 custom_span 追踪这个自定义操作
    with custom_span(
        name="数据库查询",
        data={"table": "sales", "query": "SELECT * FROM sales WHERE year=2024"},
    ):
        # 模拟耗时操作
        time.sleep(0.1)
        return "2024年销售额: 1000万, 同比增长15%"


async def main():
    with trace("数据分析流程"):
        # 自定义操作会出现在 trace 的 span 列表中
        data = await fetch_data_from_db()
        result = await Runner.run(agent, f"请分析以下数据: {data}")
        print(result.final_output)


if __name__ == "__main__":
    asyncio.run(main())

除了 custom_span,SDK 还提供了 function_spangeneration_span 等更具体的 span 类型。大多数情况下 custom_span 就够用了。


RunHooks:全局生命周期钩子

trace() 适合宏观观测,但如果你想在代码里精确控制"在 LLM 调用前做什么、工具执行后做什么",就需要 Hooks。

RunHooks 挂在整个 Runner.run() 上,不管内部有多少个 Agent、多少次工具调用,它全程跟踪。

可用的钩子方法

方法 触发时机
on_agent_start Agent 开始执行
on_agent_end Agent 产出最终输出
on_llm_start 即将调用 LLM
on_llm_end LLM 返回结果
on_tool_start 即将执行工具
on_tool_end 工具执行完毕
on_handoff 发生 Agent 切换

只覆盖你关心的方法就行,其他的默认什么都不做。

完整示例

import asyncio
import time
from typing import Any, Optional

import os
from openai import AsyncOpenAI
from agents import (
    Agent,
    AgentHookContext,
    OpenAIChatCompletionsModel,
    RunContextWrapper,
    RunHooks,
    Runner,
    Tool,
    Usage,
    function_tool,
    set_tracing_disabled,
)
from agents.items import ModelResponse, TResponseInputItem

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)


class TimingHooks(RunHooks):
    """记录每个环节耗时和 token 用量的钩子"""

    def __init__(self):
        self.event_counter = 0
        self._timers: dict[str, float] = {}

    def _log(self, msg: str, usage: Usage) -> None:
        self.event_counter += 1
        tokens = f"[累计 {usage.requests} 次请求, {usage.total_tokens} tokens]"
        print(f"  #{self.event_counter} {msg} {tokens}")

    async def on_agent_start(
        self, context: AgentHookContext, agent: Agent
    ) -> None:
        self._timers[f"agent_{agent.name}"] = time.time()
        self._log(f"Agent [{agent.name}] 启动", context.usage)

    async def on_agent_end(
        self, context: AgentHookContext, agent: Agent, output: Any
    ) -> None:
        elapsed = time.time() - self._timers.get(f"agent_{agent.name}", time.time())
        self._log(f"Agent [{agent.name}] 完成 (耗时 {elapsed:.2f}s)", context.usage)

    async def on_llm_start(
        self,
        context: RunContextWrapper,
        agent: Agent,
        system_prompt: Optional[str],
        input_items: list[TResponseInputItem],
    ) -> None:
        self._timers["llm"] = time.time()
        self._log(f"LLM 调用开始 (Agent: {agent.name})", context.usage)

    async def on_llm_end(
        self, context: RunContextWrapper, agent: Agent, response: ModelResponse
    ) -> None:
        elapsed = time.time() - self._timers.get("llm", time.time())
        self._log(f"LLM 调用完成 (耗时 {elapsed:.2f}s)", context.usage)

    async def on_tool_start(
        self, context: RunContextWrapper, agent: Agent, tool: Tool
    ) -> None:
        self._timers[f"tool_{tool.name}"] = time.time()
        self._log(f"工具 [{tool.name}] 开始执行", context.usage)

    async def on_tool_end(
        self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str
    ) -> None:
        elapsed = time.time() - self._timers.get(f"tool_{tool.name}", time.time())
        self._log(f"工具 [{tool.name}] 执行完成 (耗时 {elapsed:.2f}s, 结果: {result})", context.usage)

    async def on_handoff(
        self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent
    ) -> None:
        self._log(f"Handoff: {from_agent.name} -> {to_agent.name}", context.usage)


# 定义一个带工具的 Agent
@function_tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        return str(eval(expression))
    except Exception as e:
        return f"计算错误: {e}"


agent = Agent(
    name="数学助手",
    instructions="你是一个数学助手。用 calculate 工具来计算数学表达式,然后告诉用户结果。",
    tools=[calculate],
    model=model,
)


async def main():
    hooks = TimingHooks()
    print("=== 开始执行 ===")
    result = await Runner.run(
        agent,
        input="请计算 (17 + 23) * 3 等于多少?",
        hooks=hooks,
    )
    print(f"\n最终结果: {result.final_output}")
    print(f"总事件数: {hooks.event_counter}")


if __name__ == "__main__":
    asyncio.run(main())

运行后你会看到类似这样的输出:

=== 开始执行 ===
  #1 Agent [数学助手] 启动 [累计 0 次请求, 0 tokens]
  #2 LLM 调用开始 (Agent: 数学助手) [累计 0 次请求, 0 tokens]
  #3 LLM 调用完成 (耗时 1.23s) [累计 1 次请求, 158 tokens]
  #4 工具 [calculate] 开始执行 [累计 1 次请求, 158 tokens]
  #5 工具 [calculate] 执行完成 (耗时 0.00s, 结果: 120) [累计 1 次请求, 158 tokens]
  #6 LLM 调用开始 (Agent: 数学助手) [累计 1 次请求, 158 tokens]
  #7 LLM 调用完成 (耗时 0.89s) [累计 2 次请求, 339 tokens]
  #8 Agent [数学助手] 完成 (耗时 2.15s) [累计 2 次请求, 339 tokens]

最终结果: (17 + 23) * 3 = 120
总事件数: 8

注意看:一次用户请求,LLM 其实被调了 2 次(第一次决定调工具,第二次根据工具结果生成回答)。没有 hooks 你根本不会知道这个。


AgentHooks:单 Agent 级别钩子

RunHooks 是全局的,而 AgentHooks 是挂在某个特定 Agent 上的。只有这个 Agent 被执行时,它的钩子才会触发。

典型场景:你有 5 个 Agent,但只想监控其中那个最关键(或最可疑)的。

import asyncio
import os
from typing import Any

from openai import AsyncOpenAI

from agents import Agent, AgentHooks, AgentHookContext, RunContextWrapper, Runner, OpenAIChatCompletionsModel, set_tracing_disabled

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)


class DebugHooks(AgentHooks):
    """只给特定 Agent 加的调试钩子"""

    async def on_start(
        self, context: AgentHookContext, agent: Agent
    ) -> None:
        print(f"  [DEBUG] {agent.name} 开始工作")

    async def on_end(
        self, context: AgentHookContext, agent: Agent, output: Any
    ) -> None:
        print(f"  [DEBUG] {agent.name} 输出: {output}")

    async def on_tool_start(
        self, context: RunContextWrapper, agent: Agent, tool: Any
    ) -> None:
        print(f"  [DEBUG] {agent.name} 正在调用工具: {tool.name}")

    async def on_tool_end(
        self, context: RunContextWrapper, agent: Agent, tool: Any, result: str
    ) -> None:
        print(f"  [DEBUG] {agent.name} 工具返回: {result}")


# 只给 translator 加 hooks,writer 不加
writer = Agent(
    name="作家",
    instructions="用一句话描述Python语言的特点。",
    model=model,
    # 没有 hooks,静默执行
)

translator = Agent(
    name="翻译官",
    instructions="把输入的中文翻译成英文。",
    hooks=DebugHooks(),  # 只监控这个 Agent
    model=model,
)


async def main():
    print("--- 第一步:写作(没有 hooks,无输出)---")
    result1 = await Runner.run(writer, "请描述Python")
    print(f"作家输出: {result1.final_output}\n")

    print("--- 第二步:翻译(有 hooks,能看到细节)---")
    result2 = await Runner.run(translator, result1.final_output)
    print(f"翻译输出: {result2.final_output}")


if __name__ == "__main__":
    asyncio.run(main())

RunHooksAgentHooks 可以同时使用,互不冲突。全局装一个 RunHooks 记录整体流程,关键 Agent 上再装一个 AgentHooks 看细节,这是推荐的做法。


Usage 统计:花了多少 token

每次 Runner.run() 返回的结果里都带着 token 用量统计。

import asyncio
import os
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)

agent = Agent(
    name="话痨",
    instructions="详细回答用户的问题,至少写3段。",
    model=model,
)


async def main():
    result = await Runner.run(agent, "什么是量子计算?")
    print(f"回答: {result.final_output[:100]}...\n")

    # 从 context_wrapper 里拿 usage
    usage = result.context_wrapper.usage

    print("=== Token 用量统计 ===")
    print(f"  总请求次数: {usage.requests}")
    print(f"  输入 tokens: {usage.input_tokens}")
    print(f"  输出 tokens: {usage.output_tokens}")
    print(f"  总计 tokens: {usage.total_tokens}")

    # 缓存命中的 token 数
    if usage.input_tokens_details:
        print(f"  缓存命中 tokens: {usage.input_tokens_details.cached_tokens}")
    # 推理 token 数(o1/o3 等推理模型会用到)
    if usage.output_tokens_details:
        print(f"  推理 tokens: {usage.output_tokens_details.reasoning_tokens}")

    # 逐请求明细(如果 Agent 内部调了多次 LLM)
    if usage.request_usage_entries:
        print(f"\n=== 逐请求明细(共 {len(usage.request_usage_entries)} 次)===")
        for i, entry in enumerate(usage.request_usage_entries, 1):
            print(f"  第 {i} 次: 输入 {entry.input_tokens}, 输出 {entry.output_tokens}, 合计 {entry.total_tokens}")


if __name__ == "__main__":
    asyncio.run(main())

输出类似:

=== Token 用量统计 ===
  总请求次数: 1
  输入 tokens: 28
  输出 tokens: 350
  总计 tokens: 378
  缓存命中 tokens: 0
  推理 tokens: 0

在 Hooks 中实时监控 token

前面 TimingHooks 的例子已经演示了 -- context.usage 在每个钩子触发时都是实时更新的,你可以在 on_llm_end 里检查 token 用量,超过预算就报警:

async def on_llm_end(self, context, agent, response):
    if context.usage.total_tokens > 10000:
        print(f"WARNING: token 用量已超过 10000!当前: {context.usage.total_tokens}")

全局控制:关闭追踪

在开发阶段你可能想关掉追踪(比如跑测试的时候不想往 OpenAI 发数据):

from agents import set_tracing_disabled

# 全局关闭追踪
set_tracing_disabled(True)

# 或者只关闭单个 trace
with trace("测试流程", disabled=True):
    ...

完整实战:带追踪和钩子的多 Agent 系统

把前面学的都串起来,搞一个完整的、可运行的例子:

"""
完整示例:带追踪和钩子的多 Agent 系统
功能:用户输入一个话题 -> 研究员收集信息 -> 编辑整理成文章
"""
import asyncio
import time
from typing import Any, Optional

import os
from openai import AsyncOpenAI
from agents import (
    Agent,
    AgentHooks,
    AgentHookContext,
    OpenAIChatCompletionsModel,
    RunContextWrapper,
    RunHooks,
    Runner,
    Tool,
    Usage,
    function_tool,
    set_tracing_disabled,
    trace,
)
from agents.items import ModelResponse, TResponseInputItem

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)


# ========== 工具定义 ==========

@function_tool
def search_web(query: str) -> str:
    """搜索网络获取信息(模拟)"""
    # 模拟搜索结果
    fake_results = {
        "default": f"关于'{query}'的搜索结果: 这是一个重要的话题,有很多相关研究和讨论。"
    }
    return fake_results.get(query, fake_results["default"])


@function_tool
def count_words(text: str) -> str:
    """统计文本字数"""
    return str(len(text))


# ========== 全局钩子(RunHooks)==========

class DashboardHooks(RunHooks):
    """模拟一个监控面板,记录所有事件"""

    def __init__(self):
        self.events: list[dict] = []
        self.start_time = time.time()

    def _elapsed(self) -> str:
        return f"{time.time() - self.start_time:.2f}s"

    def _record(self, event_type: str, detail: str, usage: Usage) -> None:
        event = {
            "time": self._elapsed(),
            "type": event_type,
            "detail": detail,
            "total_tokens": usage.total_tokens,
        }
        self.events.append(event)
        print(f"  [{event['time']}] {event_type}: {detail} (累计 {usage.total_tokens} tokens)")

    async def on_agent_start(self, context: AgentHookContext, agent: Agent) -> None:
        self._record("AGENT_START", agent.name, context.usage)

    async def on_agent_end(self, context: AgentHookContext, agent: Agent, output: Any) -> None:
        self._record("AGENT_END", agent.name, context.usage)

    async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: Tool) -> None:
        self._record("TOOL_START", f"{agent.name} -> {tool.name}", context.usage)

    async def on_tool_end(self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str) -> None:
        self._record("TOOL_END", f"{agent.name} -> {tool.name}", context.usage)

    async def on_handoff(self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent) -> None:
        self._record("HANDOFF", f"{from_agent.name} -> {to_agent.name}", context.usage)

    async def on_llm_start(
        self,
        context: RunContextWrapper,
        agent: Agent,
        system_prompt: Optional[str],
        input_items: list[TResponseInputItem],
    ) -> None:
        self._record("LLM_START", agent.name, context.usage)

    async def on_llm_end(
        self, context: RunContextWrapper, agent: Agent, response: ModelResponse
    ) -> None:
        self._record("LLM_END", agent.name, context.usage)

    def print_summary(self) -> None:
        print(f"\n{'='*50}")
        print(f"执行摘要: 共 {len(self.events)} 个事件, 总耗时 {self._elapsed()}")
        if self.events:
            print(f"最终 token 用量: {self.events[-1]['total_tokens']}")
        print(f"{'='*50}")


# ========== Agent 级别钩子 ==========

class EditorDebugHooks(AgentHooks):
    """只给编辑 Agent 加的调试钩子"""

    async def on_start(self, context: AgentHookContext, agent: Agent) -> None:
        print(f"    >> [编辑专属] 编辑开始审稿")

    async def on_end(self, context: AgentHookContext, agent: Agent, output: Any) -> None:
        print(f"    >> [编辑专属] 编辑完成,输出长度: {len(str(output))} 字符")


# ========== Agent 定义 ==========

researcher = Agent(
    name="研究员",
    instructions="你是一个研究员。根据用户给的话题,用 search_web 工具搜索相关信息,然后整理成要点。",
    tools=[search_web],
    handoffs=[],  # 后面会设置
    model=model,
)

editor = Agent(
    name="编辑",
    instructions="你是一个编辑。把研究员提供的要点整理成一篇简短的文章(100字以内)。用 count_words 工具统计字数。",
    tools=[count_words],
    hooks=EditorDebugHooks(),  # 只给编辑加 hooks
    model=model,
)

# 研究员完成后交给编辑
researcher.handoffs = [editor]


# ========== 主流程 ==========

async def main():
    dashboard = DashboardHooks()

    print("=== 多 Agent 追踪演示 ===\n")

    # 用 trace() 把整个工作流打包
    with trace(
        "内容创作流水线",
        group_id="demo_session_001",
        metadata={"topic": "AI", "version": "1.0"},
    ):
        result = await Runner.run(
            researcher,
            input="帮我研究一下大语言模型的最新进展",
            hooks=dashboard,
        )

    print(f"\n最终文章:\n{result.final_output}")

    # 打印 usage 统计
    usage = result.context_wrapper.usage
    print(f"\n=== Token 用量 ===")
    print(f"  总请求次数: {usage.requests}")
    print(f"  输入 tokens: {usage.input_tokens}")
    print(f"  输出 tokens: {usage.output_tokens}")
    print(f"  总计 tokens: {usage.total_tokens}")

    # 打印逐请求明细
    if usage.request_usage_entries:
        print(f"\n=== 逐请求明细 ===")
        for i, entry in enumerate(usage.request_usage_entries, 1):
            print(f"  第 {i} 次请求: 输入 {entry.input_tokens}, 输出 {entry.output_tokens}")

    # 打印事件摘要
    dashboard.print_summary()


if __name__ == "__main__":
    asyncio.run(main())

小结

本章学了三个观测手段:

手段 适合场景 一句话总结
trace() 宏观追踪 把多个操作归到一个 trace,在 dashboard 上看全貌
RunHooks 全局监控 在 Agent 运行的每个关键节点插入自定义逻辑
AgentHooks 精准调试 只监控某个特定 Agent
custom_span() 自定义追踪 追踪你自己的业务逻辑(数据库、API 等)
Usage 成本统计 知道花了多少 token、调了几次 LLM

记住一个原则:上线前一定要加追踪。Agent 系统不像传统代码,执行路径是不确定的(LLM 可能调 1 次也可能调 5 次),没有追踪你就是在盲飞。

下一章是实战篇——我们把前 8 章学到的所有知识综合起来,从零搭建一个完整的智能客服系统。

← 第7章 结构化输出 第9章 实战项目 →