第 9 章:实战 -- 从零搭建智能客服系统
系列教程:OpenAI Agents SDK 从入门到实战
本章目标:综合运用前 8 章所有知识,从零搭建一个完整的多 Agent 智能客服系统。
项目概述
前 8 章我们一个一个拆解了 Agents SDK 的核心能力。现在把它们全拼起来,做一个真正能跑的东西。
做什么? 一个终端交互式的智能客服系统,用户输入问题,系统自动分诊、调用工具、流式回答。
架构长这样:
用户输入
|
v
输入护栏(过滤骚扰/敏感内容)
|
v
分诊 Agent(Triage)
|--- 订单查询 Agent(查订单、改地址)
|--- 技术支持 Agent(产品使用问题)
|--- 投诉处理 Agent(处理投诉、返回结构化报告)
|
v
流式输出(打字机效果)
每个专业 Agent 处理完还能转回分诊 Agent,形成双向转接。
综合运用的知识点:
| 知识点 | 在本项目中的应用 | 来自章节 |
|---|---|---|
| Agent 基础创建 | 4 个 Agent 各司其职 | 第 1 章 |
| function_tool | 查订单、改地址、搜知识库、提交投诉 | 第 2 章 |
| Handoff | 分诊到专业 Agent,双向转接 | 第 3 章 |
| Guardrail | 输入护栏过滤骚扰内容 | 第 4 章 |
| 流式输出 | 打字机效果的终端交互 | 第 5 章 |
| Context + Session | 用户上下文 + SQLite 会话记忆 | 第 6 章 |
| 结构化输出 | 投诉处理返回结构化报告 | 第 7 章 |
| RunHooks + trace | 全程追踪日志 | 第 8 章 |
项目准备
如果你已经按照第 1 章创建了项目,直接用现有环境即可。否则:
uv init customer-service
cd customer-service
uv add openai-agents
设置环境变量(参考第 1 章的配置说明):
# Linux / macOS
export MODEL_BASE_URL="http://localhost:8317/v1" # 你的模型服务地址
export MODEL_API_KEY="sk-12345678" # API Key
export MODEL_NAME="gpt-5.2" # 模型名称
如果你用的是在线 API(如 DeepSeek、硅基流动),把
MODEL_BASE_URL换成对应的 API 地址,MODEL_API_KEY填真实 Key 即可。
第一步:定义上下文和工具
用户上下文
每次对话都有一个"用户画像"跟着走。分诊 Agent 可以根据 VIP 等级决定优先级,工具函数可以读取 customer_id 来查询数据。
from pydantic import BaseModel
class CustomerContext(BaseModel):
"""用户上下文,在整个对话过程中传递"""
customer_id: str = ""
customer_name: str = ""
vip_level: int = 0 # 0=普通用户, 1=VIP, 2=SVIP
这个上下文不会发给 LLM,它是给你的代码用的 -- 工具函数、钩子、护栏都能拿到它。
工具函数
四个工具,全用 mock 数据,不依赖外部服务:
from agents import function_tool
# ---------- 查询订单 ----------
@function_tool
def lookup_order(order_id: str) -> str:
"""根据订单号查询订单详情,包括商品、状态、物流信息。"""
orders = {
"ORD-1001": "商品: iPhone 15 手机壳 | 金额: 39.9元 | 状态: 已发货 | 快递: 顺丰 SF1234567890",
"ORD-1002": "商品: USB-C 拓展坞 | 金额: 199元 | 状态: 待发货 | 预计明天发出",
"ORD-1003": "商品: 机械键盘 | 金额: 599元 | 状态: 已签收 | 签收时间: 2026-02-18",
"ORD-1004": "商品: 降噪耳机 | 金额: 899元 | 状态: 退款中 | 预计3个工作日到账",
}
return orders.get(order_id, f"未找到订单 {order_id},请核实订单号")
# ---------- 修改收货地址 ----------
@function_tool
def update_address(order_id: str, new_address: str) -> str:
"""修改订单的收货地址。只有"待发货"状态的订单才能修改。"""
# 模拟:只有 ORD-1002 是待发货,可以改地址
if order_id == "ORD-1002":
return f"订单 {order_id} 的收货地址已更新为: {new_address}"
elif order_id in ("ORD-1001", "ORD-1003", "ORD-1004"):
return f"订单 {order_id} 当前状态不支持修改地址,请联系物流方处理"
else:
return f"未找到订单 {order_id}"
# ---------- 搜索知识库 ----------
@function_tool
def search_knowledge_base(question: str) -> str:
"""搜索产品知识库,回答产品使用和售后相关问题。"""
kb = {
"退货": "支持7天无理由退货,请在【我的订单】中点击【申请退货】。",
"保修": "电子产品保修1年,需提供购买凭证。保修期内非人为损坏免费维修。",
"发票": "在订单详情页点击【申请发票】,支持电子发票和纸质发票。",
"运费": "满99元包邮,不满99元收8元运费,偏远地区可能加收。",
"充电": "请使用原装充电器,避免使用非认证配件,以免影响电池寿命。",
"蓝牙": "进入设置 -> 蓝牙 -> 搜索设备 -> 长按耳机开关3秒进入配对模式。",
"重置": "长按电源键10秒可强制重启设备。如需恢复出厂设置,请进入设置 -> 系统 -> 重置。",
}
for keyword, answer in kb.items():
if keyword in question:
return answer
return "未找到相关答案,建议您提供更多细节或尝试换个关键词。"
# ---------- 提交投诉工单 ----------
@function_tool
def submit_complaint(complaint_text: str, severity: str) -> str:
"""提交投诉工单。severity 可选值: low(一般), medium(中等), high(严重)。"""
import random
ticket_id = f"TK-{random.randint(10000, 99999)}"
return (
f"投诉工单已创建 | 工单号: {ticket_id} | "
f"严重程度: {severity} | 内容: {complaint_text} | "
f"预计24小时内会有专人联系您"
)
第二步:创建专业 Agent
三个专业 Agent,各管一摊事。注意投诉处理 Agent 用了 output_type 返回结构化报告。
投诉报告的结构化定义
from pydantic import BaseModel, Field
class ComplaintReport(BaseModel):
"""投诉处理的结构化报告"""
ticket_id: str = Field(description="工单号")
customer_summary: str = Field(description="客户诉求摘要,一句话概括")
severity: str = Field(description="严重程度: low / medium / high")
resolution: str = Field(description="处理方案")
follow_up: str = Field(description="后续跟进建议")
订单查询 Agent
from agents import Agent
order_agent = Agent[CustomerContext](
name="订单查询专员",
instructions="""你是订单查询专员,专门处理订单相关问题。
你的职责:
- 帮客户查询订单状态、物流信息
- 帮客户修改收货地址(仅限待发货订单)
- 如果客户没给订单号,主动问
注意:
- 你只管订单相关的事,技术问题或投诉请转回给分诊台
- 回答简洁,把关键信息(商品、状态、快递单号)说清楚就行""",
tools=[lookup_order, update_address],
)
技术支持 Agent
tech_agent = Agent[CustomerContext](
name="技术支持专员",
instructions="""你是技术支持专员,负责解答产品使用和售后问题。
你的职责:
- 回答产品使用方法、操作指南
- 解答售后政策(退货、保修、发票、运费等)
- 先搜知识库,搜到就用知识库的答案
注意:
- 你只管技术和产品问题
- 订单查询或投诉请转回给分诊台""",
tools=[search_knowledge_base],
)
投诉处理 Agent
complaint_agent = Agent[CustomerContext](
name="投诉处理专员",
instructions="""你是投诉处理专员,负责接待客户投诉并生成处理报告。
你的职责:
- 认真倾听客户的投诉内容
- 表达歉意和理解
- 根据投诉内容判断严重程度并提交工单
- 生成一份结构化的投诉处理报告
严重程度判断标准:
- low:一般性不满(如发货慢、包装不好)
- medium:影响使用的问题(如商品有瑕疵、功能异常)
- high:严重问题(如收到错误商品、商品损坏、安全隐患)
注意:先调用 submit_complaint 工具提交工单,然后在报告中填入工单号。""",
tools=[submit_complaint],
output_type=ComplaintReport,
)
投诉处理 Agent 和前两个不一样 -- 它设了 output_type=ComplaintReport。LLM 最终会输出一个结构化的 JSON,SDK 自动解析成 ComplaintReport 对象。这就是第 7 章学的结构化输出。
第三步:搭建分诊系统
分诊 Agent 是整个系统的入口。它不干具体活,只负责判断问题类型然后转接。
关键点:双向转接。每个专业 Agent 也能把问题转回分诊台,如果用户在和订单专员聊天时突然问了个技术问题,系统能把它重新路由。
from agents import handoff
# 先创建分诊 Agent(暂时不设 handoffs,后面补上)
triage_agent = Agent[CustomerContext](
name="智能分诊台",
instructions="""你是智能客服系统的分诊台,负责接待客户并转接到对应的专业人员。
转接规则:
- 查订单、物流、改地址 -> 转给「订单查询专员」
- 产品使用、操作方法、售后政策(退货/保修/发票/运费)-> 转给「技术支持专员」
- 投诉、不满、要求赔偿 -> 转给「投诉处理专员」
工作要求:
- 先简短问候,然后快速判断转接
- 如果判断不了,问一句就够了,别啰嗦
- 你不要自己回答专业问题,转给对应的人""",
handoffs=[order_agent, tech_agent, complaint_agent],
)
# 给专业 Agent 加上"转回分诊台"的能力
back_to_triage = handoff(
agent=triage_agent,
tool_name_override="transfer_to_triage",
tool_description_override="当客户的问题不属于你的职责范围时,转回给分诊台重新分配",
)
order_agent.handoffs = [back_to_triage]
tech_agent.handoffs = [back_to_triage]
complaint_agent.handoffs = [back_to_triage]
这段代码有个技巧:因为 triage_agent 和 order_agent 互相引用,不能在创建时同时设置。所以先创建好所有 Agent,再用 handoff() 函数创建反向转接,最后挂到各专业 Agent 上。
第四步:添加护栏
输入护栏,用简单的关键词检测,不额外调用 LLM。检测到辱骂或敏感信息直接拦截。
from agents import GuardrailFunctionOutput, InputGuardrail
async def abuse_filter(ctx, agent, user_input) -> GuardrailFunctionOutput:
"""检查用户输入是否包含辱骂或敏感信息"""
text = str(user_input).lower()
# 辱骂/骚扰关键词
abuse_words = ["去死", "废物", "垃圾公司", "骗子"]
for word in abuse_words:
if word in text:
return GuardrailFunctionOutput(
output_info={"blocked": True, "reason": f"检测到不当言论: {word}"},
tripwire_triggered=True,
)
# 敏感信息(防止用户泄露隐私)
sensitive_words = ["身份证号", "银行卡号", "密码是"]
for word in sensitive_words:
if word in text:
return GuardrailFunctionOutput(
output_info={"blocked": True, "reason": "请勿在对话中发送敏感个人信息"},
tripwire_triggered=True,
)
return GuardrailFunctionOutput(
output_info={"blocked": False},
tripwire_triggered=False,
)
# 把护栏挂到分诊 Agent 上
triage_agent.input_guardrails = [
InputGuardrail(guardrail_function=abuse_filter)
]
护栏触发后 SDK 会抛出 InputGuardrailTripwireTriggered 异常,我们在主循环里捕获它,给用户一个友好提示。
第五步:接入会话记忆
一行代码搞定。SQLiteSession 会把对话历史存到本地 SQLite 文件里,程序重启后还能接着聊。
from agents import SQLiteSession
session = SQLiteSession(
session_id="customer-001",
db_path="customer_service.db",
)
传给 Runner.run_streamed() 的时候带上 session=session,SDK 自动帮你存取对话历史。
第六步:添加追踪日志
两层追踪:RunHooks 记录运行过程中每个关键节点,trace() 上下文管理器把每轮对话包成一个 trace。
from datetime import datetime
from agents import RunHooks
class ServiceHooks(RunHooks):
"""客服系统运行钩子,在关键节点打印日志"""
async def on_agent_start(self, context, agent):
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n [{ts}] >> {agent.name} 开始处理")
async def on_agent_end(self, context, agent, output):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] << {agent.name} 处理完毕")
async def on_handoff(self, context, from_agent, to_agent):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] -- 转接: {from_agent.name} -> {to_agent.name}")
async def on_tool_start(self, context, agent, tool):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] .. 调用工具: {tool.name}")
async def on_tool_end(self, context, agent, tool, result):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] .. 工具返回结果")
在主循环里,每轮对话用 trace() 包裹,配合 gen_trace_id() 给每轮生成唯一 ID:
from agents import trace, gen_trace_id
# 整个会话共用一个 group_id,方便在 trace 后台按会话检索
conversation_id = gen_trace_id()
with trace("客服对话", group_id=conversation_id):
result = Runner.run_streamed(...)
第七步:流式输出的主循环
把所有东西拼到一起,写一个终端交互循环:
import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import (
Runner, InputGuardrailTripwireTriggered,
trace, gen_trace_id,
)
async def main():
# 创建上下文(模拟一个已登录用户)
ctx = CustomerContext(
customer_id="C10086",
customer_name="张三",
vip_level=1,
)
# 会话记忆
session = SQLiteSession(session_id="customer-001", db_path="customer_service.db")
# 全局钩子
hooks = ServiceHooks()
# 会话级 trace ID
conversation_id = gen_trace_id()
print("=" * 50)
print(" 智能客服系统")
print("=" * 50)
print(f"\n欢迎回来,{ctx.customer_name}!请问有什么可以帮您?")
print("(输入 quit 退出,输入 clear 清空对话历史)\n")
while True:
try:
user_input = input("你: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n再见!")
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "q"):
print("感谢使用,再见!")
break
if user_input.lower() == "clear":
await session.clear_session()
print("[系统] 对话历史已清空\n")
continue
try:
# 每轮对话用 trace 包裹
with trace("客服对话轮次", group_id=conversation_id):
result = Runner.run_streamed(
triage_agent,
input=user_input,
context=ctx,
session=session,
hooks=hooks,
)
# 流式输出(打字机效果)
print("\n客服: ", end="", flush=True)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
print(event.data.delta, end="", flush=True)
print("\n")
# 如果投诉 Agent 返回了结构化报告,额外展示
final = result.final_output
if isinstance(final, ComplaintReport):
print(" --- 投诉处理报告 ---")
print(f" 工单号: {final.ticket_id}")
print(f" 客户诉求: {final.customer_summary}")
print(f" 严重程度: {final.severity}")
print(f" 处理方案: {final.resolution}")
print(f" 后续跟进: {final.follow_up}")
print(" --------------------\n")
except InputGuardrailTripwireTriggered as e:
reason = e.guardrail_result.output.output_info.get("reason", "输入不合规")
print(f"\n[系统提示] 您的消息被拦截: {reason}")
print("请文明用语,我们会尽力帮您解决问题。\n")
except Exception as e:
print(f"\n[系统异常] {type(e).__name__}: {e}\n")
if __name__ == "__main__":
asyncio.run(main())
完整代码
把上面所有步骤合到一个文件里,保存为 customer_service.py 直接运行。
"""
智能客服系统 -- OpenAI Agents SDK 实战
综合运用: Agent、工具、Handoff、护栏、流式输出、Context、Session、结构化输出、Hooks、Trace
"""
import asyncio
import os
import random
from datetime import datetime
from openai import AsyncOpenAI
from openai.types.responses import ResponseTextDeltaEvent
from pydantic import BaseModel, Field
from agents import (
Agent,
GuardrailFunctionOutput,
InputGuardrail,
InputGuardrailTripwireTriggered,
OpenAIChatCompletionsModel,
RunHooks,
Runner,
SQLiteSession,
function_tool,
gen_trace_id,
handoff,
set_tracing_disabled,
trace,
)
# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
model=os.getenv("MODEL_NAME", "gpt-5.2"),
openai_client=client,
)
# ============================================================
# 1. 用户上下文
# ============================================================
class CustomerContext(BaseModel):
"""用户上下文,贯穿整个对话"""
customer_id: str = ""
customer_name: str = ""
vip_level: int = 0 # 0=普通, 1=VIP, 2=SVIP
# ============================================================
# 2. 结构化输出模型(投诉报告)
# ============================================================
class ComplaintReport(BaseModel):
"""投诉处理的结构化报告"""
ticket_id: str = Field(description="工单号")
customer_summary: str = Field(description="客户诉求摘要")
severity: str = Field(description="严重程度: low / medium / high")
resolution: str = Field(description="处理方案")
follow_up: str = Field(description="后续跟进建议")
# ============================================================
# 3. 工具定义
# ============================================================
@function_tool
def lookup_order(order_id: str) -> str:
"""根据订单号查询订单详情,包括商品、状态、物流信息。"""
orders = {
"ORD-1001": "商品: iPhone 15 手机壳 | 金额: 39.9元 | 状态: 已发货 | 快递: 顺丰 SF1234567890",
"ORD-1002": "商品: USB-C 拓展坞 | 金额: 199元 | 状态: 待发货 | 预计明天发出",
"ORD-1003": "商品: 机械键盘 | 金额: 599元 | 状态: 已签收 | 签收时间: 2026-02-18",
"ORD-1004": "商品: 降噪耳机 | 金额: 899元 | 状态: 退款中 | 预计3个工作日到账",
}
return orders.get(order_id, f"未找到订单 {order_id},请核实订单号")
@function_tool
def update_address(order_id: str, new_address: str) -> str:
"""修改订单的收货地址。只有待发货状态的订单才能修改。"""
if order_id == "ORD-1002":
return f"订单 {order_id} 的收货地址已更新为: {new_address}"
elif order_id in ("ORD-1001", "ORD-1003", "ORD-1004"):
return f"订单 {order_id} 当前状态不支持修改地址,请联系物流方处理"
else:
return f"未找到订单 {order_id}"
@function_tool
def search_knowledge_base(question: str) -> str:
"""搜索产品知识库,回答产品使用和售后相关问题。"""
kb = {
"退货": "支持7天无理由退货,请在【我的订单】中点击【申请退货】。",
"保修": "电子产品保修1年,需提供购买凭证。保修期内非人为损坏免费维修。",
"发票": "在订单详情页点击【申请发票】,支持电子发票和纸质发票。",
"运费": "满99元包邮,不满99元收8元运费,偏远地区可能加收。",
"充电": "请使用原装充电器,避免使用非认证配件,以免影响电池寿命。",
"蓝牙": "进入设置->蓝牙->搜索设备->长按耳机开关3秒进入配对模式。",
"重置": "长按电源键10秒可强制重启。如需恢复出厂设置,进入设置->系统->重置。",
}
for keyword, answer in kb.items():
if keyword in question:
return answer
return "未找到相关答案,建议您提供更多细节或换个关键词。"
@function_tool
def submit_complaint(complaint_text: str, severity: str) -> str:
"""提交投诉工单。severity 可选: low(一般)、medium(中等)、high(严重)。"""
ticket_id = f"TK-{random.randint(10000, 99999)}"
return (
f"投诉工单已创建 | 工单号: {ticket_id} | "
f"严重程度: {severity} | 内容: {complaint_text} | "
f"预计24小时内会有专人联系您"
)
# ============================================================
# 4. Agent 定义
# ============================================================
# -- 订单查询专员 --
order_agent = Agent[CustomerContext](
name="订单查询专员",
instructions="""你是订单查询专员,处理订单相关问题。
职责:查订单状态/物流、改收货地址(仅待发货订单可改)。
客户没给订单号就主动问。回答简洁,说清关键信息。
非订单问题请转回分诊台。""",
tools=[lookup_order, update_address],
model=model,
)
# -- 技术支持专员 --
tech_agent = Agent[CustomerContext](
name="技术支持专员",
instructions="""你是技术支持专员,解答产品使用和售后问题。
先用知识库搜索工具查标准答案,搜到了就用标准答案回复。
搜不到就给通用建议。
非技术问题请转回分诊台。""",
tools=[search_knowledge_base],
model=model,
)
# -- 投诉处理专员 --
complaint_agent = Agent[CustomerContext](
name="投诉处理专员",
instructions="""你是投诉处理专员,负责处理客户投诉。
工作流程:
1. 认真倾听,表达歉意
2. 判断严重程度(low/medium/high)
3. 调用 submit_complaint 工具提交工单
4. 输出结构化的投诉报告,把工单号填进去
严重程度标准:
- low: 一般不满(发货慢、包装差)
- medium: 影响使用(商品瑕疵、功能异常)
- high: 严重问题(错发、损坏、安全隐患)
非投诉问题请转回分诊台。""",
tools=[submit_complaint],
output_type=ComplaintReport,
model=model,
)
# -- 分诊台 --
triage_agent = Agent[CustomerContext](
name="智能分诊台",
instructions="""你是智能客服的分诊台,负责接待客户并转接给对应专员。
转接规则:
- 查订单、物流、改地址 -> 订单查询专员
- 产品使用、操作指南、售后政策(退货/保修/发票/运费)-> 技术支持专员
- 投诉、不满、要求赔偿 -> 投诉处理专员
简短问候后快速判断转接,判断不了就问一句。不要自己回答专业问题。""",
handoffs=[order_agent, tech_agent, complaint_agent],
model=model,
)
# -- 双向转接:专业 Agent 可以转回分诊台 --
back_to_triage = handoff(
agent=triage_agent,
tool_name_override="transfer_to_triage",
tool_description_override="当客户的问题不属于你的职责范围时,转回分诊台重新分配",
)
order_agent.handoffs = [back_to_triage]
tech_agent.handoffs = [back_to_triage]
complaint_agent.handoffs = [back_to_triage]
# ============================================================
# 5. 输入护栏
# ============================================================
async def abuse_filter(ctx, agent, user_input) -> GuardrailFunctionOutput:
"""过滤辱骂和敏感信息,不调用 LLM"""
text = str(user_input).lower()
for word in ["去死", "废物", "垃圾公司", "骗子"]:
if word in text:
return GuardrailFunctionOutput(
output_info={"blocked": True, "reason": f"检测到不当言论: {word}"},
tripwire_triggered=True,
)
for word in ["身份证号", "银行卡号", "密码是"]:
if word in text:
return GuardrailFunctionOutput(
output_info={"blocked": True, "reason": "请勿在对话中发送敏感个人信息"},
tripwire_triggered=True,
)
return GuardrailFunctionOutput(
output_info={"blocked": False},
tripwire_triggered=False,
)
triage_agent.input_guardrails = [InputGuardrail(guardrail_function=abuse_filter)]
# ============================================================
# 6. 运行钩子(追踪日志)
# ============================================================
class ServiceHooks(RunHooks):
"""在关键节点打印带时间戳的日志"""
async def on_agent_start(self, context, agent):
ts = datetime.now().strftime("%H:%M:%S")
print(f"\n [{ts}] >> {agent.name} 开始处理")
async def on_agent_end(self, context, agent, output):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] << {agent.name} 处理完毕")
async def on_handoff(self, context, from_agent, to_agent):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] -- 转接: {from_agent.name} -> {to_agent.name}")
async def on_tool_start(self, context, agent, tool):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] .. 调用工具: {tool.name}")
async def on_tool_end(self, context, agent, tool, result):
ts = datetime.now().strftime("%H:%M:%S")
print(f" [{ts}] .. 工具返回结果")
# ============================================================
# 7. 主循环
# ============================================================
async def main():
# 模拟已登录用户
ctx = CustomerContext(customer_id="C10086", customer_name="张三", vip_level=1)
# 会话记忆
session = SQLiteSession(session_id="customer-001", db_path="customer_service.db")
# 全局钩子
hooks = ServiceHooks()
# 会话级 trace ID,同一个会话的所有轮次归到一组
conversation_id = gen_trace_id()
print("=" * 50)
print(" 智能客服系统")
print("=" * 50)
print(f"\n欢迎回来,{ctx.customer_name}!请问有什么可以帮您?")
print("(输入 quit 退出,输入 clear 清空对话历史)\n")
while True:
try:
user_input = input("你: ").strip()
except (EOFError, KeyboardInterrupt):
print("\n再见!")
break
if not user_input:
continue
if user_input.lower() in ("quit", "exit", "q"):
print("感谢使用,再见!")
break
if user_input.lower() == "clear":
await session.clear_session()
print("[系统] 对话历史已清空\n")
continue
try:
# 每轮对话包在 trace 里,方便追踪
with trace("客服对话轮次", group_id=conversation_id):
result = Runner.run_streamed(
triage_agent,
input=user_input,
context=ctx,
session=session,
hooks=hooks,
)
# 流式输出
print("\n客服: ", end="", flush=True)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
print(event.data.delta, end="", flush=True)
print("\n")
# 如果是投诉,展示结构化报告
final = result.final_output
if isinstance(final, ComplaintReport):
print(" --- 投诉处理报告 ---")
print(f" 工单号: {final.ticket_id}")
print(f" 客户诉求: {final.customer_summary}")
print(f" 严重程度: {final.severity}")
print(f" 处理方案: {final.resolution}")
print(f" 后续跟进: {final.follow_up}")
print(" --------------------\n")
except InputGuardrailTripwireTriggered as e:
reason = e.guardrail_result.output.output_info.get("reason", "输入不合规")
print(f"\n[系统提示] 您的消息被拦截: {reason}")
print("请文明用语,我们会尽力帮您解决问题。\n")
except Exception as e:
print(f"\n[系统异常] {type(e).__name__}: {e}\n")
if __name__ == "__main__":
asyncio.run(main())
运行效果展示
uv run python customer_service.py
场景一:订单查询
你: 帮我查一下 ORD-1001
[14:30:01] >> 智能分诊台 开始处理
[14:30:02] -- 转接: 智能分诊台 -> 订单查询专员
[14:30:02] >> 订单查询专员 开始处理
[14:30:03] .. 调用工具: lookup_order
[14:30:03] .. 工具返回结果
客服: 您的订单 ORD-1001 信息如下:
商品:iPhone 15 手机壳
金额:39.9元
状态:已发货
快递:顺丰 SF1234567890
您可以在顺丰官网查询物流详情。
[14:30:04] << 订单查询专员 处理完毕
场景二:修改地址
你: 我想改一下 ORD-1002 的收货地址,改成北京市海淀区中关村大街1号
[14:31:10] >> 智能分诊台 开始处理
[14:31:11] -- 转接: 智能分诊台 -> 订单查询专员
[14:31:11] >> 订单查询专员 开始处理
[14:31:12] .. 调用工具: update_address
[14:31:12] .. 工具返回结果
客服: 已帮您修改成功!订单 ORD-1002 的收货地址已更新为:
北京市海淀区中关村大街1号
[14:31:13] << 订单查询专员 处理完毕
场景三:技术问题
你: 蓝牙耳机怎么配对?
[14:32:05] >> 智能分诊台 开始处理
[14:32:06] -- 转接: 智能分诊台 -> 技术支持专员
[14:32:06] >> 技术支持专员 开始处理
[14:32:07] .. 调用工具: search_knowledge_base
[14:32:07] .. 工具返回结果
客服: 蓝牙耳机配对步骤:
进入手机设置 -> 蓝牙 -> 搜索设备 -> 长按耳机开关3秒进入配对模式
搜索到设备后点击连接即可。
[14:32:08] << 技术支持专员 处理完毕
场景四:投诉处理(结构化输出)
你: 我买的键盘收到就是坏的,一个按键按不动,太差了
[14:33:01] >> 智能分诊台 开始处理
[14:33:02] -- 转接: 智能分诊台 -> 投诉处理专员
[14:33:02] >> 投诉处理专员 开始处理
[14:33:03] .. 调用工具: submit_complaint
[14:33:03] .. 工具返回结果
客服: (结构化输出,非文本形式)
[14:33:04] << 投诉处理专员 处理完毕
--- 投诉处理报告 ---
工单号: TK-38291
客户诉求: 客户收到的机械键盘存在按键故障,无法正常使用
严重程度: medium
处理方案: 安排换货,同时赠送延保服务作为补偿
后续跟进: 48小时内联系客户确认换货地址,换货后回访使用情况
--------------------
场景五:护栏拦截
你: 你们这些骗子,退钱!
[系统提示] 您的消息被拦截: 检测到不当言论: 骗子
请文明用语,我们会尽力帮您解决问题。
场景六:多轮对话记忆
你: 我叫李四,帮我查 ORD-1003
...
客服: 李四您好!您的订单 ORD-1003 是一个机械键盘,已签收...
你: 这个订单能退货吗?
...
客服: 您之前查询的 ORD-1003(机械键盘)已签收。
我们支持7天无理由退货,请在【我的订单】中点击【申请退货】...
因为有 Session,AI 记住了用户名字和之前查过的订单,不需要重复提供信息。
扩展方向
这个项目是一个完整的起点,往生产环境走还可以做这些:
1. 接入真实数据库
把 mock 数据换成真实的数据库查询。工具函数里可以通过 RunContextWrapper 拿到用户上下文:
from agents import RunContextWrapper
@function_tool
async def lookup_order(ctx: RunContextWrapper[CustomerContext], order_id: str) -> str:
"""查询订单"""
customer_id = ctx.context.customer_id
# 用 customer_id + order_id 去数据库查询
...
2. 添加更多 Agent
创建新 Agent 加到 triage_agent.handoffs 里就行:
# 比如加一个推荐 Agent
recommend_agent = Agent(
name="商品推荐专员",
instructions="根据用户历史购买记录推荐商品...",
tools=[recommend_products],
)
triage_agent.handoffs.append(recommend_agent)
3. 部署为 Web API
用 FastAPI + WebSocket 替换终端交互:
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/chat/{session_id}")
async def chat(websocket: WebSocket, session_id: str):
await websocket.accept()
session = SQLiteSession(session_id=session_id, db_path="chat.db")
while True:
user_input = await websocket.receive_text()
result = Runner.run_streamed(triage_agent, input=user_input, session=session)
async for event in result.stream_events():
if event.type == "raw_response_event" and isinstance(
event.data, ResponseTextDeltaEvent
):
await websocket.send_text(event.data.delta)
4. 生产级改进
- 用 Redis 或 PostgreSQL 替代 SQLite 存储会话
- 把 RunHooks 的日志接入 ELK 或 Prometheus
- trace 数据发到 OpenAI 后台或自建的 trace 服务
- 加上输出护栏,防止 AI 泄露内部信息
- 给 VIP 用户配置不同的模型或更长的上下文
章节回顾
这一章我们把前 8 章的所有知识点拼成了一个完整的项目:
| 组件 | 对应知识 | 作用 |
|---|---|---|
CustomerContext |
第 6 章 Context | 传递用户信息 |
function_tool |
第 2 章 工具 | 查订单、改地址、搜知识库、提交投诉 |
handoff + 双向转接 |
第 3 章 Handoff | 分诊路由 + 转回机制 |
InputGuardrail |
第 4 章 护栏 | 拦截不当输入 |
run_streamed |
第 5 章 流式输出 | 打字机效果 |
SQLiteSession |
第 6 章 Session | 多轮对话记忆 |
ComplaintReport + output_type |
第 7 章 结构化输出 | 投诉报告 |
RunHooks + trace() |
第 8 章 追踪日志 | 全程监控 |
核心思路就一句话:每个组件单独看都很简单,组合起来就是一个能用的系统。