第 8 章:追踪与调试 -- 出了问题怎么查
系列教程:OpenAI Agents SDK 从入门到实战
本章目标:掌握 Tracing、Hooks、Usage 三板斧,让 Agent 的每一步操作都透明可查。
为什么需要追踪?
Agent 不是普通函数调用。一次 Runner.run() 背后可能发生了这些事:
- LLM 被调了 3 次(你以为只调了 1 次)
- 工具 A 执行了 2 次(其中一次超时了)
- 发生了一次 Handoff(从 Agent A 转给了 Agent B)
- 总共烧了 5000 个 token(你的钱包在哭泣)
问题是:如果你不主动追踪,上面这些信息你一个都看不到。出了 bug 就是抓瞎。
Agent SDK 提供了三个层面的观测能力:
| 层面 | 工具 | 用途 |
|---|---|---|
| 宏观追踪 | trace() |
把一组操作打包成一个 trace,在 OpenAI dashboard 上查看 |
| 微观钩子 | RunHooks / AgentHooks |
在每个关键节点插入自定义逻辑(打日志、记指标) |
| 成本统计 | Usage |
统计 token 用量,知道花了多少钱 |
下面逐个击破。
trace():给操作打包追踪
自动追踪
好消息:SDK 默认会自动追踪每次 Runner.run() 的执行。你不需要写任何代码,trace 数据就会被发送到 OpenAI 后端(如果你配了 API key 的话)。
手动打包多个操作
有时候你的工作流里有多次 Runner.run() 调用,你希望它们归到同一个 trace 下面。这时候就用 trace() 上下文管理器:
import asyncio
import os
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, trace
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
# 定义两个 Agent
summarizer = Agent(
name="摘要员",
instructions="用一句话总结用户的输入。",
model=model,
)
translator = Agent(
name="翻译员",
instructions="把输入翻译成英文。",
model=model,
)
async def main():
# 用 trace() 把两次 run 打包到同一个追踪里
with trace("摘要翻译流水线"):
# 第一步:摘要
result1 = await Runner.run(summarizer, "Python是一门非常流行的编程语言,广泛应用于数据科学、Web开发和人工智能领域。")
print(f"摘要: {result1.final_output}")
# 第二步:翻译(把摘要结果传给翻译员)
result2 = await Runner.run(translator, result1.final_output)
print(f"翻译: {result2.final_output}")
if __name__ == "__main__":
asyncio.run(main())
在 OpenAI dashboard 上,这两次调用会出现在同一个名为"摘要翻译流水线"的 trace 下面,一目了然。
trace() 的参数
with trace(
workflow_name="客服系统", # trace 名称,必填
trace_id=None, # 自定义 trace ID,不填会自动生成
group_id="chat_thread_123", # 分组 ID,比如同一个会话的多次交互
metadata={"user_id": "u456"}, # 自定义元数据,方便过滤和分析
disabled=False, # 设为 True 可以关掉这个 trace
):
...
group_id 特别有用 -- 同一个用户的多轮对话,用同一个 group_id 串起来,回头查问题时直接按 group 过滤。
custom_span():追踪自定义操作
SDK 自动追踪 LLM 调用和工具执行,但你可能还有一些自己的业务逻辑想追踪(比如数据库查询、外部 API 调用)。这时候用 custom_span():
import asyncio
import os
import time
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, trace
from agents.tracing import custom_span
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
agent = Agent(
name="分析师",
instructions="分析用户提供的数据,给出简要结论。",
model=model,
)
async def fetch_data_from_db() -> str:
"""模拟数据库查询"""
# 用 custom_span 追踪这个自定义操作
with custom_span(
name="数据库查询",
data={"table": "sales", "query": "SELECT * FROM sales WHERE year=2024"},
):
# 模拟耗时操作
time.sleep(0.1)
return "2024年销售额: 1000万, 同比增长15%"
async def main():
with trace("数据分析流程"):
# 自定义操作会出现在 trace 的 span 列表中
data = await fetch_data_from_db()
result = await Runner.run(agent, f"请分析以下数据: {data}")
print(result.final_output)
if __name__ == "__main__":
asyncio.run(main())
除了 custom_span,SDK 还提供了 function_span、generation_span 等更具体的 span 类型。大多数情况下 custom_span 就够用了。
RunHooks:全局生命周期钩子
trace() 适合宏观观测,但如果你想在代码里精确控制"在 LLM 调用前做什么、工具执行后做什么",就需要 Hooks。
RunHooks 挂在整个 Runner.run() 上,不管内部有多少个 Agent、多少次工具调用,它全程跟踪。
可用的钩子方法
| 方法 | 触发时机 |
|---|---|
on_agent_start |
Agent 开始执行 |
on_agent_end |
Agent 产出最终输出 |
on_llm_start |
即将调用 LLM |
on_llm_end |
LLM 返回结果 |
on_tool_start |
即将执行工具 |
on_tool_end |
工具执行完毕 |
on_handoff |
发生 Agent 切换 |
只覆盖你关心的方法就行,其他的默认什么都不做。
完整示例
import asyncio
import time
from typing import Any, Optional
import os
from openai import AsyncOpenAI
from agents import (
Agent,
AgentHookContext,
OpenAIChatCompletionsModel,
RunContextWrapper,
RunHooks,
Runner,
Tool,
Usage,
function_tool,
set_tracing_disabled,
)
from agents.items import ModelResponse, TResponseInputItem
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
class TimingHooks(RunHooks):
"""记录每个环节耗时和 token 用量的钩子"""
def __init__(self):
self.event_counter = 0
self._timers: dict[str, float] = {}
def _log(self, msg: str, usage: Usage) -> None:
self.event_counter += 1
tokens = f"[累计 {usage.requests} 次请求, {usage.total_tokens} tokens]"
print(f" #{self.event_counter} {msg} {tokens}")
async def on_agent_start(
self, context: AgentHookContext, agent: Agent
) -> None:
self._timers[f"agent_{agent.name}"] = time.time()
self._log(f"Agent [{agent.name}] 启动", context.usage)
async def on_agent_end(
self, context: AgentHookContext, agent: Agent, output: Any
) -> None:
elapsed = time.time() - self._timers.get(f"agent_{agent.name}", time.time())
self._log(f"Agent [{agent.name}] 完成 (耗时 {elapsed:.2f}s)", context.usage)
async def on_llm_start(
self,
context: RunContextWrapper,
agent: Agent,
system_prompt: Optional[str],
input_items: list[TResponseInputItem],
) -> None:
self._timers["llm"] = time.time()
self._log(f"LLM 调用开始 (Agent: {agent.name})", context.usage)
async def on_llm_end(
self, context: RunContextWrapper, agent: Agent, response: ModelResponse
) -> None:
elapsed = time.time() - self._timers.get("llm", time.time())
self._log(f"LLM 调用完成 (耗时 {elapsed:.2f}s)", context.usage)
async def on_tool_start(
self, context: RunContextWrapper, agent: Agent, tool: Tool
) -> None:
self._timers[f"tool_{tool.name}"] = time.time()
self._log(f"工具 [{tool.name}] 开始执行", context.usage)
async def on_tool_end(
self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str
) -> None:
elapsed = time.time() - self._timers.get(f"tool_{tool.name}", time.time())
self._log(f"工具 [{tool.name}] 执行完成 (耗时 {elapsed:.2f}s, 结果: {result})", context.usage)
async def on_handoff(
self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent
) -> None:
self._log(f"Handoff: {from_agent.name} -> {to_agent.name}", context.usage)
# 定义一个带工具的 Agent
@function_tool
def calculate(expression: str) -> str:
"""计算数学表达式"""
try:
return str(eval(expression))
except Exception as e:
return f"计算错误: {e}"
agent = Agent(
name="数学助手",
instructions="你是一个数学助手。用 calculate 工具来计算数学表达式,然后告诉用户结果。",
tools=[calculate],
model=model,
)
async def main():
hooks = TimingHooks()
print("=== 开始执行 ===")
result = await Runner.run(
agent,
input="请计算 (17 + 23) * 3 等于多少?",
hooks=hooks,
)
print(f"\n最终结果: {result.final_output}")
print(f"总事件数: {hooks.event_counter}")
if __name__ == "__main__":
asyncio.run(main())
运行后你会看到类似这样的输出:
=== 开始执行 ===
#1 Agent [数学助手] 启动 [累计 0 次请求, 0 tokens]
#2 LLM 调用开始 (Agent: 数学助手) [累计 0 次请求, 0 tokens]
#3 LLM 调用完成 (耗时 1.23s) [累计 1 次请求, 158 tokens]
#4 工具 [calculate] 开始执行 [累计 1 次请求, 158 tokens]
#5 工具 [calculate] 执行完成 (耗时 0.00s, 结果: 120) [累计 1 次请求, 158 tokens]
#6 LLM 调用开始 (Agent: 数学助手) [累计 1 次请求, 158 tokens]
#7 LLM 调用完成 (耗时 0.89s) [累计 2 次请求, 339 tokens]
#8 Agent [数学助手] 完成 (耗时 2.15s) [累计 2 次请求, 339 tokens]
最终结果: (17 + 23) * 3 = 120
总事件数: 8
注意看:一次用户请求,LLM 其实被调了 2 次(第一次决定调工具,第二次根据工具结果生成回答)。没有 hooks 你根本不会知道这个。
AgentHooks:单 Agent 级别钩子
RunHooks 是全局的,而 AgentHooks 是挂在某个特定 Agent 上的。只有这个 Agent 被执行时,它的钩子才会触发。
典型场景:你有 5 个 Agent,但只想监控其中那个最关键(或最可疑)的。
import asyncio
import os
from typing import Any
from openai import AsyncOpenAI
from agents import Agent, AgentHooks, AgentHookContext, RunContextWrapper, Runner, OpenAIChatCompletionsModel, set_tracing_disabled
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
class DebugHooks(AgentHooks):
"""只给特定 Agent 加的调试钩子"""
async def on_start(
self, context: AgentHookContext, agent: Agent
) -> None:
print(f" [DEBUG] {agent.name} 开始工作")
async def on_end(
self, context: AgentHookContext, agent: Agent, output: Any
) -> None:
print(f" [DEBUG] {agent.name} 输出: {output}")
async def on_tool_start(
self, context: RunContextWrapper, agent: Agent, tool: Any
) -> None:
print(f" [DEBUG] {agent.name} 正在调用工具: {tool.name}")
async def on_tool_end(
self, context: RunContextWrapper, agent: Agent, tool: Any, result: str
) -> None:
print(f" [DEBUG] {agent.name} 工具返回: {result}")
# 只给 translator 加 hooks,writer 不加
writer = Agent(
name="作家",
instructions="用一句话描述Python语言的特点。",
model=model,
# 没有 hooks,静默执行
)
translator = Agent(
name="翻译官",
instructions="把输入的中文翻译成英文。",
hooks=DebugHooks(), # 只监控这个 Agent
model=model,
)
async def main():
print("--- 第一步:写作(没有 hooks,无输出)---")
result1 = await Runner.run(writer, "请描述Python")
print(f"作家输出: {result1.final_output}\n")
print("--- 第二步:翻译(有 hooks,能看到细节)---")
result2 = await Runner.run(translator, result1.final_output)
print(f"翻译输出: {result2.final_output}")
if __name__ == "__main__":
asyncio.run(main())
RunHooks 和 AgentHooks 可以同时使用,互不冲突。全局装一个 RunHooks 记录整体流程,关键 Agent 上再装一个 AgentHooks 看细节,这是推荐的做法。
Usage 统计:花了多少 token
每次 Runner.run() 返回的结果里都带着 token 用量统计。
import asyncio
import os
from openai import AsyncOpenAI
from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
agent = Agent(
name="话痨",
instructions="详细回答用户的问题,至少写3段。",
model=model,
)
async def main():
result = await Runner.run(agent, "什么是量子计算?")
print(f"回答: {result.final_output[:100]}...\n")
# 从 context_wrapper 里拿 usage
usage = result.context_wrapper.usage
print("=== Token 用量统计 ===")
print(f" 总请求次数: {usage.requests}")
print(f" 输入 tokens: {usage.input_tokens}")
print(f" 输出 tokens: {usage.output_tokens}")
print(f" 总计 tokens: {usage.total_tokens}")
# 缓存命中的 token 数
if usage.input_tokens_details:
print(f" 缓存命中 tokens: {usage.input_tokens_details.cached_tokens}")
# 推理 token 数(o1/o3 等推理模型会用到)
if usage.output_tokens_details:
print(f" 推理 tokens: {usage.output_tokens_details.reasoning_tokens}")
# 逐请求明细(如果 Agent 内部调了多次 LLM)
if usage.request_usage_entries:
print(f"\n=== 逐请求明细(共 {len(usage.request_usage_entries)} 次)===")
for i, entry in enumerate(usage.request_usage_entries, 1):
print(f" 第 {i} 次: 输入 {entry.input_tokens}, 输出 {entry.output_tokens}, 合计 {entry.total_tokens}")
if __name__ == "__main__":
asyncio.run(main())
输出类似:
=== Token 用量统计 ===
总请求次数: 1
输入 tokens: 28
输出 tokens: 350
总计 tokens: 378
缓存命中 tokens: 0
推理 tokens: 0
在 Hooks 中实时监控 token
前面 TimingHooks 的例子已经演示了 -- context.usage 在每个钩子触发时都是实时更新的,你可以在 on_llm_end 里检查 token 用量,超过预算就报警:
async def on_llm_end(self, context, agent, response):
if context.usage.total_tokens > 10000:
print(f"WARNING: token 用量已超过 10000!当前: {context.usage.total_tokens}")
全局控制:关闭追踪
在开发阶段你可能想关掉追踪(比如跑测试的时候不想往 OpenAI 发数据):
from agents import set_tracing_disabled
# 全局关闭追踪
set_tracing_disabled(True)
# 或者只关闭单个 trace
with trace("测试流程", disabled=True):
...
完整实战:带追踪和钩子的多 Agent 系统
把前面学的都串起来,搞一个完整的、可运行的例子:
"""
完整示例:带追踪和钩子的多 Agent 系统
功能:用户输入一个话题 -> 研究员收集信息 -> 编辑整理成文章
"""
import asyncio
import time
from typing import Any, Optional
import os
from openai import AsyncOpenAI
from agents import (
Agent,
AgentHooks,
AgentHookContext,
OpenAIChatCompletionsModel,
RunContextWrapper,
RunHooks,
Runner,
Tool,
Usage,
function_tool,
set_tracing_disabled,
trace,
)
from agents.items import ModelResponse, TResponseInputItem
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
# ========== 工具定义 ==========
@function_tool
def search_web(query: str) -> str:
"""搜索网络获取信息(模拟)"""
# 模拟搜索结果
fake_results = {
"default": f"关于'{query}'的搜索结果: 这是一个重要的话题,有很多相关研究和讨论。"
}
return fake_results.get(query, fake_results["default"])
@function_tool
def count_words(text: str) -> str:
"""统计文本字数"""
return str(len(text))
# ========== 全局钩子(RunHooks)==========
class DashboardHooks(RunHooks):
"""模拟一个监控面板,记录所有事件"""
def __init__(self):
self.events: list[dict] = []
self.start_time = time.time()
def _elapsed(self) -> str:
return f"{time.time() - self.start_time:.2f}s"
def _record(self, event_type: str, detail: str, usage: Usage) -> None:
event = {
"time": self._elapsed(),
"type": event_type,
"detail": detail,
"total_tokens": usage.total_tokens,
}
self.events.append(event)
print(f" [{event['time']}] {event_type}: {detail} (累计 {usage.total_tokens} tokens)")
async def on_agent_start(self, context: AgentHookContext, agent: Agent) -> None:
self._record("AGENT_START", agent.name, context.usage)
async def on_agent_end(self, context: AgentHookContext, agent: Agent, output: Any) -> None:
self._record("AGENT_END", agent.name, context.usage)
async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: Tool) -> None:
self._record("TOOL_START", f"{agent.name} -> {tool.name}", context.usage)
async def on_tool_end(self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str) -> None:
self._record("TOOL_END", f"{agent.name} -> {tool.name}", context.usage)
async def on_handoff(self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent) -> None:
self._record("HANDOFF", f"{from_agent.name} -> {to_agent.name}", context.usage)
async def on_llm_start(
self,
context: RunContextWrapper,
agent: Agent,
system_prompt: Optional[str],
input_items: list[TResponseInputItem],
) -> None:
self._record("LLM_START", agent.name, context.usage)
async def on_llm_end(
self, context: RunContextWrapper, agent: Agent, response: ModelResponse
) -> None:
self._record("LLM_END", agent.name, context.usage)
def print_summary(self) -> None:
print(f"\n{'='*50}")
print(f"执行摘要: 共 {len(self.events)} 个事件, 总耗时 {self._elapsed()}")
if self.events:
print(f"最终 token 用量: {self.events[-1]['total_tokens']}")
print(f"{'='*50}")
# ========== Agent 级别钩子 ==========
class EditorDebugHooks(AgentHooks):
"""只给编辑 Agent 加的调试钩子"""
async def on_start(self, context: AgentHookContext, agent: Agent) -> None:
print(f" >> [编辑专属] 编辑开始审稿")
async def on_end(self, context: AgentHookContext, agent: Agent, output: Any) -> None:
print(f" >> [编辑专属] 编辑完成,输出长度: {len(str(output))} 字符")
# ========== Agent 定义 ==========
researcher = Agent(
name="研究员",
instructions="你是一个研究员。根据用户给的话题,用 search_web 工具搜索相关信息,然后整理成要点。",
tools=[search_web],
handoffs=[], # 后面会设置
model=model,
)
editor = Agent(
name="编辑",
instructions="你是一个编辑。把研究员提供的要点整理成一篇简短的文章(100字以内)。用 count_words 工具统计字数。",
tools=[count_words],
hooks=EditorDebugHooks(), # 只给编辑加 hooks
model=model,
)
# 研究员完成后交给编辑
researcher.handoffs = [editor]
# ========== 主流程 ==========
async def main():
dashboard = DashboardHooks()
print("=== 多 Agent 追踪演示 ===\n")
# 用 trace() 把整个工作流打包
with trace(
"内容创作流水线",
group_id="demo_session_001",
metadata={"topic": "AI", "version": "1.0"},
):
result = await Runner.run(
researcher,
input="帮我研究一下大语言模型的最新进展",
hooks=dashboard,
)
print(f"\n最终文章:\n{result.final_output}")
# 打印 usage 统计
usage = result.context_wrapper.usage
print(f"\n=== Token 用量 ===")
print(f" 总请求次数: {usage.requests}")
print(f" 输入 tokens: {usage.input_tokens}")
print(f" 输出 tokens: {usage.output_tokens}")
print(f" 总计 tokens: {usage.total_tokens}")
# 打印逐请求明细
if usage.request_usage_entries:
print(f"\n=== 逐请求明细 ===")
for i, entry in enumerate(usage.request_usage_entries, 1):
print(f" 第 {i} 次请求: 输入 {entry.input_tokens}, 输出 {entry.output_tokens}")
# 打印事件摘要
dashboard.print_summary()
if __name__ == "__main__":
asyncio.run(main())
小结
本章学了三个观测手段:
| 手段 | 适合场景 | 一句话总结 |
|---|---|---|
trace() |
宏观追踪 | 把多个操作归到一个 trace,在 dashboard 上看全貌 |
RunHooks |
全局监控 | 在 Agent 运行的每个关键节点插入自定义逻辑 |
AgentHooks |
精准调试 | 只监控某个特定 Agent |
custom_span() |
自定义追踪 | 追踪你自己的业务逻辑(数据库、API 等) |
Usage |
成本统计 | 知道花了多少 token、调了几次 LLM |
记住一个原则:上线前一定要加追踪。Agent 系统不像传统代码,执行路径是不确定的(LLM 可能调 1 次也可能调 5 次),没有追踪你就是在盲飞。
下一章是实战篇——我们把前 8 章学到的所有知识综合起来,从零搭建一个完整的智能客服系统。