第 5 章:流式输出——打字机效果

本章目标

掌握 Runner.run_streamed() 的用法,理解三种流式事件类型,实现逐字打印和结构化事件监听。


为什么要流式输出?

你用 ChatGPT 的时候,回答是一个字一个字冒出来的,而不是等 10 秒然后"啪"一下全出来。这就是流式输出。

为什么它重要?就两个字:体验

非流式就像发短信,流式就像打电话。打电话的时候对方一直在说,你心里踏实。


基础用法:逐字打印

先来最简单的——让 Agent 讲几个笑话,文字一个字一个字蹦出来。

import asyncio
import os

from openai import AsyncOpenAI
from openai.types.responses import ResponseTextDeltaEvent

from agents import Agent, Runner, OpenAIChatCompletionsModel, set_tracing_disabled

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)

# 创建一个简单的 Agent
agent = Agent(
    name="Joker",
    instructions="You are a helpful assistant.",
    model=model,
)


async def main():
    # run_streamed() 不需要 await,直接返回一个流式结果对象
    result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")

    # 通过 async for 逐个消费流式事件
    async for event in result.stream_events():
        # 只关心原始文本块事件 —— 这就是"每个字"
        if event.type == "raw_response_event" and isinstance(
            event.data, ResponseTextDeltaEvent
        ):
            # end="" 不换行,flush=True 立刻刷到屏幕
            print(event.data.delta, end="", flush=True)

    print()  # 最后换个行


if __name__ == "__main__":
    asyncio.run(main())

运行效果:

Sure! Here are 5 jokes for you:

1. Why don't skeletons fight each other?
   They don't have the guts!
...(文字一个一个冒出来)

核心就三步: 1. Runner.run_streamed() 启动流式运行 2. async for event in result.stream_events() 逐个拿事件 3. 判断事件类型,ResponseTextDeltaEvent 里的 delta 就是每一小块文本

end=""flush=True

这两个参数是打字机效果的关键,展开说一下:


三种事件类型详解

调用 result.stream_events() 时,你会收到三种事件。每种有不同的用途:

事件类型 类名 干什么用
raw_response_event RawResponsesStreamEvent LLM 吐出的原始数据块——逐字打印用这个
run_item_stream_event RunItemStreamEvent 一个完整的"动作"处理完了,比如工具调用完成、消息输出完成
agent_updated_stream_event AgentUpdatedStreamEvent Agent 发生了切换(多 Agent 场景下会遇到)

1. RawResponsesStreamEvent -- 原始 token 流

这是 LLM 直接吐出来的原始数据。里面有各种子类型,最重要的是 ResponseTextDeltaEvent,每个 delta 就是一小块文本(可能是一个字、一个词、甚至半个标点)。

打字机效果全靠它。

if event.type == "raw_response_event":
    if isinstance(event.data, ResponseTextDeltaEvent):
        # event.data.delta 就是这一小块文本
        print(event.data.delta, end="", flush=True)

2. RunItemStreamEvent -- 结构化事件

当 SDK 处理完一个完整的"动作"时触发。常见的 item 类型:

这些事件比原始事件来得晚,因为它们是"处理完"才发出的。适合用来展示进度信息,比如"正在调用工具..."。

if event.type == "run_item_stream_event":
    if event.item.type == "tool_call_item":
        print(f"调用工具: {getattr(event.item.raw_item, 'name', '未知')}")
    elif event.item.type == "tool_call_output_item":
        print(f"工具返回: {event.item.output}")
    elif event.item.type == "message_output_item":
        # 用 ItemHelpers 提取完整文本
        print(f"完整消息: {ItemHelpers.text_message_output(event.item)}")

3. AgentUpdatedStreamEvent -- Agent 切换通知

在多 Agent 协作(Handoff)场景下,控制权从一个 Agent 转到另一个时触发。单 Agent 场景下也会在流开始时收到一次。

if event.type == "agent_updated_stream_event":
    print(f"当前 Agent: {event.new_agent.name}")

实用模式:同时处理 token 流和结构化事件

实际开发中,你往往既要逐字输出,又要监听工具调用等结构化事件。下面这个例子展示了完整的事件处理模式——一个带工具调用的 Agent 在流式模式下的表现。

import asyncio
import os
import random

from openai import AsyncOpenAI

from agents import Agent, ItemHelpers, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, function_tool

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)


# 定义一个工具:随机决定讲几个笑话
@function_tool
def how_many_jokes() -> int:
    """Return a random integer of jokes to tell between 1 and 10."""
    return random.randint(1, 10)


async def main():
    agent = Agent(
        name="Joker",
        instructions="First call the `how_many_jokes` tool, then tell that many jokes.",
        tools=[how_many_jokes],
        model=model,
    )

    result = Runner.run_streamed(agent, input="Hello")
    print("=== 流式运行开始 ===")

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            # 原始 token 流 —— 这里我们选择跳过,只关注结构化事件
            # 如果你想同时逐字打印,把下面的 continue 换成打印逻辑
            continue

        elif event.type == "agent_updated_stream_event":
            # Agent 切换/初始化
            print(f"Agent updated: {event.new_agent.name}")

        elif event.type == "run_item_stream_event":
            # 结构化事件:工具调用、工具输出、消息
            if event.item.type == "tool_call_item":
                tool_name = getattr(event.item.raw_item, "name", "Unknown")
                print(f"-- Tool was called: {tool_name}")
            elif event.item.type == "tool_call_output_item":
                print(f"-- Tool output: {event.item.output}")
            elif event.item.type == "message_output_item":
                # ItemHelpers.text_message_output() 从消息项中提取纯文本
                print(f"-- Message: {ItemHelpers.text_message_output(event.item)}")

    print("=== 流式运行结束 ===")


if __name__ == "__main__":
    asyncio.run(main())

运行效果大致如下:

=== 流式运行开始 ===
Agent updated: Joker
-- Tool was called: how_many_jokes
-- Tool output: 4
-- Message: Sure, here are four jokes for you:

1. Why don't skeletons fight each other?
   They don't have the guts!

2. What do you call fake spaghetti?
   An impasta!

3. Why did the scarecrow win an award?
   Because he was outstanding in his field!

4. Why did the bicycle fall over?
   Because it was two-tired!
=== 流式运行结束 ===

可以看到整个流程:Agent 先调用 how_many_jokes 工具拿到数字,然后根据数字讲对应数量的笑话。每一步都以事件的形式被我们捕获到了。

ItemHelpers.text_message_output()

这个工具方法专门用来从 message_output_item 里提取纯文本。因为消息的底层结构可能包含多个 content block(文本、图片等),直接取文本比较麻烦,ItemHelpers 帮你封装好了:

from agents import ItemHelpers

# event.item 是 message_output_item 类型
text = ItemHelpers.text_message_output(event.item)

完整可运行代码

下面把逐字打印和结构化事件处理结合在一起,这是实际项目中最常见的写法:

import asyncio
import os
import random

from openai import AsyncOpenAI
from openai.types.responses import ResponseTextDeltaEvent

from agents import Agent, ItemHelpers, Runner, OpenAIChatCompletionsModel, set_tracing_disabled, function_tool

# --- 模型配置 ---
set_tracing_disabled(True)
client = AsyncOpenAI(
    base_url=os.getenv("MODEL_BASE_URL", "http://localhost:8317/v1"),
    api_key=os.getenv("MODEL_API_KEY", "sk-12345678"),
)
model = OpenAIChatCompletionsModel(
    model=os.getenv("MODEL_NAME", "gpt-5.2"),
    openai_client=client,
)


@function_tool
def roll_dice(sides: int) -> str:
    """掷骰子,返回点数。

    Args:
        sides: 骰子的面数。
    """
    result = random.randint(1, sides)
    return f"掷出了 {result} 点({sides}面骰子)"


agent = Agent(
    name="骰子大师",
    instructions="你是骰子大师。用户要掷骰子时调用 roll_dice 工具,然后对结果做有趣的评论。",
    tools=[roll_dice],
    model=model,
)


async def main():
    result = Runner.run_streamed(
        agent, input="帮我掷一个6面骰子和一个20面骰子"
    )

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            # 逐字打印 LLM 输出的文本
            if isinstance(event.data, ResponseTextDeltaEvent):
                print(event.data.delta, end="", flush=True)

        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                tool_name = getattr(event.item.raw_item, "name", "未知工具")
                print(f"\n>>> 正在调用工具: {tool_name}")
            elif event.item.type == "tool_call_output_item":
                print(f">>> 工具结果: {event.item.output}")
            elif event.item.type == "message_output_item":
                # 消息完成,可以拿到完整文本
                full_text = ItemHelpers.text_message_output(event.item)
                print(f"\n--- 回复完成(共 {len(full_text)} 字)---")

        elif event.type == "agent_updated_stream_event":
            print(f"[当前 Agent: {event.new_agent.name}]")

    # 流结束后才能拿到 final_output
    print(f"\n最终输出: {result.final_output}")


if __name__ == "__main__":
    asyncio.run(main())

运行效果:

[当前 Agent: 骰子大师]
>>> 正在调用工具: roll_dice
>>> 工具结果: 掷出了 4 点(6面骰子)
>>> 正在调用工具: roll_dice
>>> 工具结果: 掷出了 17 点(20面骰子)
6面骰子掷出了4点,中规中矩;20面骰子掷出了17点,手气相当不错!
--- 回复完成(共 42 字)---

最终输出: 6面骰子掷出了4点,中规中矩;20面骰子掷出了17点,手气相当不错!

注意事项

run_streamed() 不需要 await

新手最容易踩的坑:

# 错误!会报错
result = await Runner.run_streamed(agent, input="你好")

# 正确!直接调用
result = Runner.run_streamed(agent, input="你好")

run_streamed() 同步返回一个 RunResultStreaming 对象,真正的异步发生在 async for 遍历 stream_events() 的时候。

final_output 要等流结束

result.final_output 在流式传输过程中是 None,只有 async for 循环走完才有值:

result = Runner.run_streamed(agent, input="你好")

# 此时 result.final_output 是 None
async for event in result.stream_events():
    ...

# 流结束后才有值
print(result.final_output)

流式和非流式结果一致

run_streamed() 最终的 final_outputnew_items 等结果,和 run() 完全一样。流式只是改变了"交付方式"——从"打包送到"变成"边做边上",最终结果不变。


小结

这一章我们学到了:

下一步预告

下一章我们来学 Session(会话管理),让 AI 拥有"记忆",能记住之前聊过的内容,实现真正的多轮对话。

← 第4章 护栏 第6章 上下文与记忆 →