第 9 章:实战 -- 从零搭建智能客服系统

系列教程:OpenAI Agents SDK 从入门到实战

本章目标:综合运用前 8 章所有知识,从零搭建一个完整的多 Agent 智能客服系统。


项目概述

前 8 章我们一个一个拆解了 Agents SDK 的核心能力。现在把它们全拼起来,做一个真正能跑的东西。

做什么? 一个终端交互式的智能客服系统,用户输入问题,系统自动分诊、调用工具、流式回答。

架构长这样:

用户输入
  |
  v
输入护栏(过滤骚扰/敏感内容)
  |
  v
分诊 Agent(Triage)
  |--- 订单查询 Agent(查订单、改地址)
  |--- 技术支持 Agent(产品使用问题)
  |--- 投诉处理 Agent(处理投诉、返回结构化报告)
  |
  v
流式输出(打字机效果)

每个专业 Agent 处理完还能转回分诊 Agent,形成双向转接。

综合运用的知识点:

知识点 在本项目中的应用 来自章节
Agent 基础创建 4 个 Agent 各司其职 第 1 章
function_tool 查订单、改地址、搜知识库、提交投诉 第 2 章
Handoff 分诊到专业 Agent,双向转接 第 3 章
Guardrail 输入护栏过滤骚扰内容 第 4 章
流式输出 打字机效果的终端交互 第 5 章
Context + Session 用户上下文 + SQLite 会话记忆 第 6 章
结构化输出 投诉处理返回结构化报告 第 7 章
RunHooks + trace 全程追踪日志 第 8 章

项目准备

如果你已经按照第 1 章创建了项目,直接用现有环境即可。否则:

uv init customer-service
cd customer-service
uv add openai-agents

设置环境变量(参考第 1 章的配置说明):

# Linux / macOS
export MODEL_BASE_URL="http://localhost:8317/v1"   # 你的模型服务地址
export MODEL_API_KEY="sk-12345678"                         # API Key
export MODEL_NAME="gpt-5.2"                       # 模型名称

如果你用的是在线 API(如 DeepSeek、硅基流动),把 MODEL_BASE_URL 换成对应的 API 地址,MODEL_API_KEY 填真实 Key 即可。


第一步:定义上下文和工具

用户上下文

每次对话都有一个"用户画像"跟着走。分诊 Agent 可以根据 VIP 等级决定优先级,工具函数可以读取 customer_id 来查询数据。

from pydantic import BaseModel

class CustomerContext(BaseModel):
    """用户上下文,在整个对话过程中传递"""
    customer_id: str = ""
    customer_name: str = ""
    vip_level: int = 0  # 0=普通用户, 1=VIP, 2=SVIP

这个上下文不会发给 LLM,它是给你的代码用的 -- 工具函数、钩子、护栏都能拿到它。

工具函数

四个工具,全用 mock 数据,不依赖外部服务:

from agents import function_tool

# ---------- 查询订单 ----------
@function_tool
def lookup_order(order_id: str) -> str:
    """根据订单号查询订单详情,包括商品、状态、物流信息。"""
    orders = {
        "ORD-1001": "商品: iPhone 15 手机壳 | 金额: 39.9元 | 状态: 已发货 | 快递: 顺丰 SF1234567890",
        "ORD-1002": "商品: USB-C 拓展坞 | 金额: 199元 | 状态: 待发货 | 预计明天发出",
        "ORD-1003": "商品: 机械键盘 | 金额: 599元 | 状态: 已签收 | 签收时间: 2026-02-18",
        "ORD-1004": "商品: 降噪耳机 | 金额: 899元 | 状态: 退款中 | 预计3个工作日到账",
    }
    return orders.get(order_id, f"未找到订单 {order_id},请核实订单号")


# ---------- 修改收货地址 ----------
@function_tool
def update_address(order_id: str, new_address: str) -> str:
    """修改订单的收货地址。只有"待发货"状态的订单才能修改。"""
    # 模拟:只有 ORD-1002 是待发货,可以改地址
    if order_id == "ORD-1002":
        return f"订单 {order_id} 的收货地址已更新为: {new_address}"
    elif order_id in ("ORD-1001", "ORD-1003", "ORD-1004"):
        return f"订单 {order_id} 当前状态不支持修改地址,请联系物流方处理"
    else:
        return f"未找到订单 {order_id}"


# ---------- 搜索知识库 ----------
@function_tool
def search_knowledge_base(question: str) -> str:
    """搜索产品知识库,回答产品使用和售后相关问题。"""
    kb = {
        "退货": "支持7天无理由退货,请在【我的订单】中点击【申请退货】。",
        "保修": "电子产品保修1年,需提供购买凭证。保修期内非人为损坏免费维修。",
        "发票": "在订单详情页点击【申请发票】,支持电子发票和纸质发票。",
        "运费": "满99元包邮,不满99元收8元运费,偏远地区可能加收。",
        "充电": "请使用原装充电器,避免使用非认证配件,以免影响电池寿命。",
        "蓝牙": "进入设置 -> 蓝牙 -> 搜索设备 -> 长按耳机开关3秒进入配对模式。",
        "重置": "长按电源键10秒可强制重启设备。如需恢复出厂设置,请进入设置 -> 系统 -> 重置。",
    }
    for keyword, answer in kb.items():
        if keyword in question:
            return answer
    return "未找到相关答案,建议您提供更多细节或尝试换个关键词。"


# ---------- 提交投诉工单 ----------
@function_tool
def submit_complaint(complaint_text: str, severity: str) -> str:
    """提交投诉工单。severity 可选值: low(一般), medium(中等), high(严重)。"""
    import random
    ticket_id = f"TK-{random.randint(10000, 99999)}"
    return (
        f"投诉工单已创建 | 工单号: {ticket_id} | "
        f"严重程度: {severity} | 内容: {complaint_text} | "
        f"预计24小时内会有专人联系您"
    )

第二步:创建专业 Agent

三个专业 Agent,各管一摊事。注意投诉处理 Agent 用了 output_type 返回结构化报告。

投诉报告的结构化定义

from pydantic import BaseModel, Field

class ComplaintReport(BaseModel):
    """投诉处理的结构化报告"""
    ticket_id: str = Field(description="工单号")
    customer_summary: str = Field(description="客户诉求摘要,一句话概括")
    severity: str = Field(description="严重程度: low / medium / high")
    resolution: str = Field(description="处理方案")
    follow_up: str = Field(description="后续跟进建议")

订单查询 Agent

from agents import Agent

order_agent = Agent[CustomerContext](
    name="订单查询专员",
    instructions="""你是订单查询专员,专门处理订单相关问题。

你的职责:
- 帮客户查询订单状态、物流信息
- 帮客户修改收货地址(仅限待发货订单)
- 如果客户没给订单号,主动问

注意:
- 你只管订单相关的事,技术问题或投诉请转回给分诊台
- 回答简洁,把关键信息(商品、状态、快递单号)说清楚就行""",
    tools=[lookup_order, update_address],
)

技术支持 Agent

tech_agent = Agent[CustomerContext](
    name="技术支持专员",
    instructions="""你是技术支持专员,负责解答产品使用和售后问题。

你的职责:
- 回答产品使用方法、操作指南
- 解答售后政策(退货、保修、发票、运费等)
- 先搜知识库,搜到就用知识库的答案

注意:
- 你只管技术和产品问题
- 订单查询或投诉请转回给分诊台""",
    tools=[search_knowledge_base],
)

投诉处理 Agent

complaint_agent = Agent[CustomerContext](
    name="投诉处理专员",
    instructions="""你是投诉处理专员,负责接待客户投诉并生成处理报告。

你的职责:
- 认真倾听客户的投诉内容
- 表达歉意和理解
- 根据投诉内容判断严重程度并提交工单
- 生成一份结构化的投诉处理报告

严重程度判断标准:
- low:一般性不满(如发货慢、包装不好)
- medium:影响使用的问题(如商品有瑕疵、功能异常)
- high:严重问题(如收到错误商品、商品损坏、安全隐患)

注意:先调用 submit_complaint 工具提交工单,然后在报告中填入工单号。""",
    tools=[submit_complaint],
    output_type=ComplaintReport,
)

投诉处理 Agent 和前两个不一样 -- 它设了 output_type=ComplaintReport。LLM 最终会输出一个结构化的 JSON,SDK 自动解析成 ComplaintReport 对象。这就是第 7 章学的结构化输出。


第三步:搭建分诊系统

分诊 Agent 是整个系统的入口。它不干具体活,只负责判断问题类型然后转接。

关键点:双向转接。每个专业 Agent 也能把问题转回分诊台,如果用户在和订单专员聊天时突然问了个技术问题,系统能把它重新路由。

from agents import handoff

# 先创建分诊 Agent(暂时不设 handoffs,后面补上)
triage_agent = Agent[CustomerContext](
    name="智能分诊台",
    instructions="""你是智能客服系统的分诊台,负责接待客户并转接到对应的专业人员。

转接规则:
- 查订单、物流、改地址 -> 转给「订单查询专员」
- 产品使用、操作方法、售后政策(退货/保修/发票/运费)-> 转给「技术支持专员」
- 投诉、不满、要求赔偿 -> 转给「投诉处理专员」

工作要求:
- 先简短问候,然后快速判断转接
- 如果判断不了,问一句就够了,别啰嗦
- 你不要自己回答专业问题,转给对应的人""",
    handoffs=[order_agent, tech_agent, complaint_agent],
)

# 给专业 Agent 加上"转回分诊台"的能力
back_to_triage = handoff(
    agent=triage_agent,
    tool_name_override="transfer_to_triage",
    tool_description_override="当客户的问题不属于你的职责范围时,转回给分诊台重新分配",
)

order_agent.handoffs = [back_to_triage]
tech_agent.handoffs = [back_to_triage]
complaint_agent.handoffs = [back_to_triage]

这段代码有个技巧:因为 triage_agentorder_agent 互相引用,不能在创建时同时设置。所以先创建好所有 Agent,再用 handoff() 函数创建反向转接,最后挂到各专业 Agent 上。


第四步:添加护栏

输入护栏,用简单的关键词检测,不额外调用 LLM。检测到辱骂或敏感信息直接拦截。

from agents import GuardrailFunctionOutput, InputGuardrail

async def abuse_filter(ctx, agent, user_input) -> GuardrailFunctionOutput:
    """检查用户输入是否包含辱骂或敏感信息"""
    text = str(user_input).lower()

    # 辱骂/骚扰关键词
    abuse_words = ["去死", "废物", "垃圾公司", "骗子"]
    for word in abuse_words:
        if word in text:
            return GuardrailFunctionOutput(
                output_info={"blocked": True, "reason": f"检测到不当言论: {word}"},
                tripwire_triggered=True,
            )

    # 敏感信息(防止用户泄露隐私)
    sensitive_words = ["身份证号", "银行卡号", "密码是"]
    for word in sensitive_words:
        if word in text:
            return GuardrailFunctionOutput(
                output_info={"blocked": True, "reason": "请勿在对话中发送敏感个人信息"},
                tripwire_triggered=True,
            )

    return GuardrailFunctionOutput(
        output_info={"blocked": False},
        tripwire_triggered=False,
    )

# 把护栏挂到分诊 Agent 上
triage_agent.input_guardrails = [
    InputGuardrail(guardrail_function=abuse_filter)
]

护栏触发后 SDK 会抛出 InputGuardrailTripwireTriggered 异常,我们在主循环里捕获它,给用户一个友好提示。


第五步:接入会话记忆

一行代码搞定。SQLiteSession 会把对话历史存到本地 SQLite 文件里,程序重启后还能接着聊。

from agents import SQLiteSession

session = SQLiteSession(
    session_id="customer-001",
    db_path="customer_service.db",
)

传给 Runner.run_streamed() 的时候带上 session=session,SDK 自动帮你存取对话历史。


第六步:添加追踪日志

两层追踪:RunHooks 记录运行过程中每个关键节点,trace() 上下文管理器把每轮对话包成一个 trace。

from datetime import datetime
from agents import RunHooks

class ServiceHooks(RunHooks):
    """客服系统运行钩子,在关键节点打印日志"""

    async def on_agent_start(self, context, agent):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"\n  [{ts}] >> {agent.name} 开始处理")

    async def on_agent_end(self, context, agent, output):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] << {agent.name} 处理完毕")

    async def on_handoff(self, context, from_agent, to_agent):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] -- 转接: {from_agent.name} -> {to_agent.name}")

    async def on_tool_start(self, context, agent, tool):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] .. 调用工具: {tool.name}")

    async def on_tool_end(self, context, agent, tool, result):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] .. 工具返回结果")

在主循环里,每轮对话用 trace() 包裹,配合 gen_trace_id() 给每轮生成唯一 ID:

from agents import trace, gen_trace_id

# 整个会话共用一个 group_id,方便在 trace 后台按会话检索
conversation_id = gen_trace_id()

with trace("客服对话", group_id=conversation_id):
    result = Runner.run_streamed(...)

第七步:流式输出的主循环

把所有东西拼到一起,写一个终端交互循环:

import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import (
    Runner, InputGuardrailTripwireTriggered,
    trace, gen_trace_id,
)

async def main():
    # 创建上下文(模拟一个已登录用户)
    ctx = CustomerContext(
        customer_id="C10086",
        customer_name="张三",
        vip_level=1,
    )

    # 会话记忆
    session = SQLiteSession(session_id="customer-001", db_path="customer_service.db")

    # 全局钩子
    hooks = ServiceHooks()

    # 会话级 trace ID
    conversation_id = gen_trace_id()

    print("=" * 50)
    print("    智能客服系统")
    print("=" * 50)
    print(f"\n欢迎回来,{ctx.customer_name}!请问有什么可以帮您?")
    print("(输入 quit 退出,输入 clear 清空对话历史)\n")

    while True:
        try:
            user_input = input("你: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\n再见!")
            break

        if not user_input:
            continue
        if user_input.lower() in ("quit", "exit", "q"):
            print("感谢使用,再见!")
            break
        if user_input.lower() == "clear":
            await session.clear_session()
            print("[系统] 对话历史已清空\n")
            continue

        try:
            # 每轮对话用 trace 包裹
            with trace("客服对话轮次", group_id=conversation_id):
                result = Runner.run_streamed(
                    triage_agent,
                    input=user_input,
                    context=ctx,
                    session=session,
                    hooks=hooks,
                )

                # 流式输出(打字机效果)
                print("\n客服: ", end="", flush=True)
                async for event in result.stream_events():
                    if event.type == "raw_response_event" and isinstance(
                        event.data, ResponseTextDeltaEvent
                    ):
                        print(event.data.delta, end="", flush=True)
                print("\n")

                # 如果投诉 Agent 返回了结构化报告,额外展示
                final = result.final_output
                if isinstance(final, ComplaintReport):
                    print("  --- 投诉处理报告 ---")
                    print(f"  工单号: {final.ticket_id}")
                    print(f"  客户诉求: {final.customer_summary}")
                    print(f"  严重程度: {final.severity}")
                    print(f"  处理方案: {final.resolution}")
                    print(f"  后续跟进: {final.follow_up}")
                    print("  --------------------\n")

        except InputGuardrailTripwireTriggered as e:
            reason = e.guardrail_result.output.output_info.get("reason", "输入不合规")
            print(f"\n[系统提示] 您的消息被拦截: {reason}")
            print("请文明用语,我们会尽力帮您解决问题。\n")

        except Exception as e:
            print(f"\n[系统异常] {type(e).__name__}: {e}\n")

if __name__ == "__main__":
    asyncio.run(main())

完整代码

把上面所有步骤合到一个文件里,保存为 customer_service.py 直接运行。

"""
智能客服系统 -- OpenAI Agents SDK 实战
综合运用: Agent、工具、Handoff、护栏、流式输出、Context、Session、结构化输出、Hooks、Trace
"""

import asyncio
import os
import random
from datetime import datetime

from openai import AsyncOpenAI
from openai.types.responses import ResponseTextDeltaEvent
from pydantic import BaseModel, Field

from agents import (
    Agent,
    GuardrailFunctionOutput,
    InputGuardrail,
    InputGuardrailTripwireTriggered,
    OpenAIChatCompletionsModel,
    RunHooks,
    Runner,
    SQLiteSession,
    function_tool,
    gen_trace_id,
    handoff,
    set_tracing_disabled,
    trace,
)

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)

# ============================================================
# 1. 用户上下文
# ============================================================

class CustomerContext(BaseModel):
    """用户上下文,贯穿整个对话"""
    customer_id: str = ""
    customer_name: str = ""
    vip_level: int = 0  # 0=普通, 1=VIP, 2=SVIP


# ============================================================
# 2. 结构化输出模型(投诉报告)
# ============================================================

class ComplaintReport(BaseModel):
    """投诉处理的结构化报告"""
    ticket_id: str = Field(description="工单号")
    customer_summary: str = Field(description="客户诉求摘要")
    severity: str = Field(description="严重程度: low / medium / high")
    resolution: str = Field(description="处理方案")
    follow_up: str = Field(description="后续跟进建议")


# ============================================================
# 3. 工具定义
# ============================================================

@function_tool
def lookup_order(order_id: str) -> str:
    """根据订单号查询订单详情,包括商品、状态、物流信息。"""
    orders = {
        "ORD-1001": "商品: iPhone 15 手机壳 | 金额: 39.9元 | 状态: 已发货 | 快递: 顺丰 SF1234567890",
        "ORD-1002": "商品: USB-C 拓展坞 | 金额: 199元 | 状态: 待发货 | 预计明天发出",
        "ORD-1003": "商品: 机械键盘 | 金额: 599元 | 状态: 已签收 | 签收时间: 2026-02-18",
        "ORD-1004": "商品: 降噪耳机 | 金额: 899元 | 状态: 退款中 | 预计3个工作日到账",
    }
    return orders.get(order_id, f"未找到订单 {order_id},请核实订单号")


@function_tool
def update_address(order_id: str, new_address: str) -> str:
    """修改订单的收货地址。只有待发货状态的订单才能修改。"""
    if order_id == "ORD-1002":
        return f"订单 {order_id} 的收货地址已更新为: {new_address}"
    elif order_id in ("ORD-1001", "ORD-1003", "ORD-1004"):
        return f"订单 {order_id} 当前状态不支持修改地址,请联系物流方处理"
    else:
        return f"未找到订单 {order_id}"


@function_tool
def search_knowledge_base(question: str) -> str:
    """搜索产品知识库,回答产品使用和售后相关问题。"""
    kb = {
        "退货": "支持7天无理由退货,请在【我的订单】中点击【申请退货】。",
        "保修": "电子产品保修1年,需提供购买凭证。保修期内非人为损坏免费维修。",
        "发票": "在订单详情页点击【申请发票】,支持电子发票和纸质发票。",
        "运费": "满99元包邮,不满99元收8元运费,偏远地区可能加收。",
        "充电": "请使用原装充电器,避免使用非认证配件,以免影响电池寿命。",
        "蓝牙": "进入设置->蓝牙->搜索设备->长按耳机开关3秒进入配对模式。",
        "重置": "长按电源键10秒可强制重启。如需恢复出厂设置,进入设置->系统->重置。",
    }
    for keyword, answer in kb.items():
        if keyword in question:
            return answer
    return "未找到相关答案,建议您提供更多细节或换个关键词。"


@function_tool
def submit_complaint(complaint_text: str, severity: str) -> str:
    """提交投诉工单。severity 可选: low(一般)、medium(中等)、high(严重)。"""
    ticket_id = f"TK-{random.randint(10000, 99999)}"
    return (
        f"投诉工单已创建 | 工单号: {ticket_id} | "
        f"严重程度: {severity} | 内容: {complaint_text} | "
        f"预计24小时内会有专人联系您"
    )


# ============================================================
# 4. Agent 定义
# ============================================================

# -- 订单查询专员 --
order_agent = Agent[CustomerContext](
    name="订单查询专员",
    instructions="""你是订单查询专员,处理订单相关问题。
职责:查订单状态/物流、改收货地址(仅待发货订单可改)。
客户没给订单号就主动问。回答简洁,说清关键信息。
非订单问题请转回分诊台。""",
    tools=[lookup_order, update_address],
    model=model,
)

# -- 技术支持专员 --
tech_agent = Agent[CustomerContext](
    name="技术支持专员",
    instructions="""你是技术支持专员,解答产品使用和售后问题。
先用知识库搜索工具查标准答案,搜到了就用标准答案回复。
搜不到就给通用建议。
非技术问题请转回分诊台。""",
    tools=[search_knowledge_base],
    model=model,
)

# -- 投诉处理专员 --
complaint_agent = Agent[CustomerContext](
    name="投诉处理专员",
    instructions="""你是投诉处理专员,负责处理客户投诉。
工作流程:
1. 认真倾听,表达歉意
2. 判断严重程度(low/medium/high)
3. 调用 submit_complaint 工具提交工单
4. 输出结构化的投诉报告,把工单号填进去

严重程度标准:
- low: 一般不满(发货慢、包装差)
- medium: 影响使用(商品瑕疵、功能异常)
- high: 严重问题(错发、损坏、安全隐患)

非投诉问题请转回分诊台。""",
    tools=[submit_complaint],
    output_type=ComplaintReport,
    model=model,
)

# -- 分诊台 --
triage_agent = Agent[CustomerContext](
    name="智能分诊台",
    instructions="""你是智能客服的分诊台,负责接待客户并转接给对应专员。

转接规则:
- 查订单、物流、改地址 -> 订单查询专员
- 产品使用、操作指南、售后政策(退货/保修/发票/运费)-> 技术支持专员
- 投诉、不满、要求赔偿 -> 投诉处理专员

简短问候后快速判断转接,判断不了就问一句。不要自己回答专业问题。""",
    handoffs=[order_agent, tech_agent, complaint_agent],
    model=model,
)

# -- 双向转接:专业 Agent 可以转回分诊台 --
back_to_triage = handoff(
    agent=triage_agent,
    tool_name_override="transfer_to_triage",
    tool_description_override="当客户的问题不属于你的职责范围时,转回分诊台重新分配",
)
order_agent.handoffs = [back_to_triage]
tech_agent.handoffs = [back_to_triage]
complaint_agent.handoffs = [back_to_triage]


# ============================================================
# 5. 输入护栏
# ============================================================

async def abuse_filter(ctx, agent, user_input) -> GuardrailFunctionOutput:
    """过滤辱骂和敏感信息,不调用 LLM"""
    text = str(user_input).lower()

    for word in ["去死", "废物", "垃圾公司", "骗子"]:
        if word in text:
            return GuardrailFunctionOutput(
                output_info={"blocked": True, "reason": f"检测到不当言论: {word}"},
                tripwire_triggered=True,
            )

    for word in ["身份证号", "银行卡号", "密码是"]:
        if word in text:
            return GuardrailFunctionOutput(
                output_info={"blocked": True, "reason": "请勿在对话中发送敏感个人信息"},
                tripwire_triggered=True,
            )

    return GuardrailFunctionOutput(
        output_info={"blocked": False},
        tripwire_triggered=False,
    )

triage_agent.input_guardrails = [InputGuardrail(guardrail_function=abuse_filter)]


# ============================================================
# 6. 运行钩子(追踪日志)
# ============================================================

class ServiceHooks(RunHooks):
    """在关键节点打印带时间戳的日志"""

    async def on_agent_start(self, context, agent):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"\n  [{ts}] >> {agent.name} 开始处理")

    async def on_agent_end(self, context, agent, output):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] << {agent.name} 处理完毕")

    async def on_handoff(self, context, from_agent, to_agent):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] -- 转接: {from_agent.name} -> {to_agent.name}")

    async def on_tool_start(self, context, agent, tool):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] .. 调用工具: {tool.name}")

    async def on_tool_end(self, context, agent, tool, result):
        ts = datetime.now().strftime("%H:%M:%S")
        print(f"  [{ts}] .. 工具返回结果")


# ============================================================
# 7. 主循环
# ============================================================

async def main():
    # 模拟已登录用户
    ctx = CustomerContext(customer_id="C10086", customer_name="张三", vip_level=1)

    # 会话记忆
    session = SQLiteSession(session_id="customer-001", db_path="customer_service.db")

    # 全局钩子
    hooks = ServiceHooks()

    # 会话级 trace ID,同一个会话的所有轮次归到一组
    conversation_id = gen_trace_id()

    print("=" * 50)
    print("    智能客服系统")
    print("=" * 50)
    print(f"\n欢迎回来,{ctx.customer_name}!请问有什么可以帮您?")
    print("(输入 quit 退出,输入 clear 清空对话历史)\n")

    while True:
        try:
            user_input = input("你: ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\n再见!")
            break

        if not user_input:
            continue
        if user_input.lower() in ("quit", "exit", "q"):
            print("感谢使用,再见!")
            break
        if user_input.lower() == "clear":
            await session.clear_session()
            print("[系统] 对话历史已清空\n")
            continue

        try:
            # 每轮对话包在 trace 里,方便追踪
            with trace("客服对话轮次", group_id=conversation_id):
                result = Runner.run_streamed(
                    triage_agent,
                    input=user_input,
                    context=ctx,
                    session=session,
                    hooks=hooks,
                )

                # 流式输出
                print("\n客服: ", end="", flush=True)
                async for event in result.stream_events():
                    if event.type == "raw_response_event" and isinstance(
                        event.data, ResponseTextDeltaEvent
                    ):
                        print(event.data.delta, end="", flush=True)
                print("\n")

                # 如果是投诉,展示结构化报告
                final = result.final_output
                if isinstance(final, ComplaintReport):
                    print("  --- 投诉处理报告 ---")
                    print(f"  工单号:   {final.ticket_id}")
                    print(f"  客户诉求: {final.customer_summary}")
                    print(f"  严重程度: {final.severity}")
                    print(f"  处理方案: {final.resolution}")
                    print(f"  后续跟进: {final.follow_up}")
                    print("  --------------------\n")

        except InputGuardrailTripwireTriggered as e:
            reason = e.guardrail_result.output.output_info.get("reason", "输入不合规")
            print(f"\n[系统提示] 您的消息被拦截: {reason}")
            print("请文明用语,我们会尽力帮您解决问题。\n")

        except Exception as e:
            print(f"\n[系统异常] {type(e).__name__}: {e}\n")


if __name__ == "__main__":
    asyncio.run(main())

运行效果展示

uv run python customer_service.py

场景一:订单查询

: 帮我查一下 ORD-1001

  [14:30:01] >> 智能分诊台 开始处理
  [14:30:02] -- 转接: 智能分诊台 -> 订单查询专员
  [14:30:02] >> 订单查询专员 开始处理
  [14:30:03] .. 调用工具: lookup_order
  [14:30:03] .. 工具返回结果

客服: 您的订单 ORD-1001 信息如下:
      商品:iPhone 15 手机壳
      金额:39.9
      状态:已发货
      快递:顺丰 SF1234567890
      您可以在顺丰官网查询物流详情。

  [14:30:04] << 订单查询专员 处理完毕

场景二:修改地址

: 我想改一下 ORD-1002 的收货地址,改成北京市海淀区中关村大街1

  [14:31:10] >> 智能分诊台 开始处理
  [14:31:11] -- 转接: 智能分诊台 -> 订单查询专员
  [14:31:11] >> 订单查询专员 开始处理
  [14:31:12] .. 调用工具: update_address
  [14:31:12] .. 工具返回结果

客服: 已帮您修改成功!订单 ORD-1002 的收货地址已更新为:
      北京市海淀区中关村大街1

  [14:31:13] << 订单查询专员 处理完毕

场景三:技术问题

: 蓝牙耳机怎么配对?

  [14:32:05] >> 智能分诊台 开始处理
  [14:32:06] -- 转接: 智能分诊台 -> 技术支持专员
  [14:32:06] >> 技术支持专员 开始处理
  [14:32:07] .. 调用工具: search_knowledge_base
  [14:32:07] .. 工具返回结果

客服: 蓝牙耳机配对步骤:
      进入手机设置 -> 蓝牙 -> 搜索设备 -> 长按耳机开关3秒进入配对模式
      搜索到设备后点击连接即可。

  [14:32:08] << 技术支持专员 处理完毕

场景四:投诉处理(结构化输出)

: 我买的键盘收到就是坏的,一个按键按不动,太差了

  [14:33:01] >> 智能分诊台 开始处理
  [14:33:02] -- 转接: 智能分诊台 -> 投诉处理专员
  [14:33:02] >> 投诉处理专员 开始处理
  [14:33:03] .. 调用工具: submit_complaint
  [14:33:03] .. 工具返回结果

客服: (结构化输出,非文本形式)

  [14:33:04] << 投诉处理专员 处理完毕

  --- 投诉处理报告 ---
  工单号:   TK-38291
  客户诉求: 客户收到的机械键盘存在按键故障,无法正常使用
  严重程度: medium
  处理方案: 安排换货,同时赠送延保服务作为补偿
  后续跟进: 48小时内联系客户确认换货地址,换货后回访使用情况
  --------------------

场景五:护栏拦截

: 你们这些骗子,退钱!

[系统提示] 您的消息被拦截: 检测到不当言论: 骗子
请文明用语,我们会尽力帮您解决问题。

场景六:多轮对话记忆

: 我叫李四,帮我查 ORD-1003
  ...
客服: 李四您好!您的订单 ORD-1003 是一个机械键盘,已签收...

: 这个订单能退货吗?
  ...
客服: 您之前查询的 ORD-1003(机械键盘)已签收。
      我们支持7天无理由退货,请在【我的订单】中点击【申请退货】...

因为有 Session,AI 记住了用户名字和之前查过的订单,不需要重复提供信息。


扩展方向

这个项目是一个完整的起点,往生产环境走还可以做这些:

1. 接入真实数据库

把 mock 数据换成真实的数据库查询。工具函数里可以通过 RunContextWrapper 拿到用户上下文:

from agents import RunContextWrapper

@function_tool
async def lookup_order(ctx: RunContextWrapper[CustomerContext], order_id: str) -> str:
    """查询订单"""
    customer_id = ctx.context.customer_id
    # 用 customer_id + order_id 去数据库查询
    ...

2. 添加更多 Agent

创建新 Agent 加到 triage_agent.handoffs 里就行:

# 比如加一个推荐 Agent
recommend_agent = Agent(
    name="商品推荐专员",
    instructions="根据用户历史购买记录推荐商品...",
    tools=[recommend_products],
)
triage_agent.handoffs.append(recommend_agent)

3. 部署为 Web API

用 FastAPI + WebSocket 替换终端交互:

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/chat/{session_id}")
async def chat(websocket: WebSocket, session_id: str):
    await websocket.accept()
    session = SQLiteSession(session_id=session_id, db_path="chat.db")
    while True:
        user_input = await websocket.receive_text()
        result = Runner.run_streamed(triage_agent, input=user_input, session=session)
        async for event in result.stream_events():
            if event.type == "raw_response_event" and isinstance(
                event.data, ResponseTextDeltaEvent
            ):
                await websocket.send_text(event.data.delta)

4. 生产级改进


章节回顾

这一章我们把前 8 章的所有知识点拼成了一个完整的项目:

组件 对应知识 作用
CustomerContext 第 6 章 Context 传递用户信息
function_tool 第 2 章 工具 查订单、改地址、搜知识库、提交投诉
handoff + 双向转接 第 3 章 Handoff 分诊路由 + 转回机制
InputGuardrail 第 4 章 护栏 拦截不当输入
run_streamed 第 5 章 流式输出 打字机效果
SQLiteSession 第 6 章 Session 多轮对话记忆
ComplaintReport + output_type 第 7 章 结构化输出 投诉报告
RunHooks + trace() 第 8 章 追踪日志 全程监控

核心思路就一句话:每个组件单独看都很简单,组合起来就是一个能用的系统。

← 第8章 追踪与调试