Skip to content

串流 (Streaming)

串流 (Streaming) 讓你能夠在代理 (agent) 執行過程中訂閱其即時更新。這對於向終端使用者顯示進度更新和部分回應非常有用。

要進行串流,你可以呼叫 [Runner.run_streamed()][agents.run.Runner.run_streamed],這會回傳一個 [RunResultStreaming][agents.result.RunResultStreaming]。呼叫 result.stream_events() 會給你一個非同步串流 (async stream) 的 [StreamEvent][agents.stream_events.StreamEvent] 物件,這些物件會在下方進行說明。

原始回應事件 (Raw response events)

[RawResponsesStreamEvent][agents.stream_events.RawResponsesStreamEvent] 是直接從大型語言模型 (LLM) 傳遞過來的原始事件。它們採用 OpenAI Responses API 格式,這代表每個事件都有一個型別(例如 response.createdresponse.output_text.delta 等)以及資料。這些事件很適合在回應訊息一產生時就立刻串流給使用者。

例如,下列方式會逐字元(token-by-token)輸出大型語言模型 (LLM) 所產生的文字。

import asyncio
from openai.types.responses import ResponseTextDeltaEvent
from agents import Agent, Runner

async def main():
    agent = Agent(
        name="Joker",
        instructions="You are a helpful assistant.",
    )

    result = Runner.run_streamed(agent, input="Please tell me 5 jokes.")
    async for event in result.stream_events():
        if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent):
            print(event.data.delta, end="", flush=True)


if __name__ == "__main__":
    asyncio.run(main())

Run item 事件與 agent 事件

[RunItemStreamEvent][agents.stream_events.RunItemStreamEvent] 屬於較高層級的事件。它會在一個 item 完整產生時通知你。這讓你可以在「訊息已產生」、「工具已執行」等層級推送進度更新,而不是每個 token 都推送。類似地,[AgentUpdatedStreamEvent][agents.stream_events.AgentUpdatedStreamEvent] 會在目前的 agent 發生變更時(例如因交接而變更)提供更新。

舉例來說,以下做法會忽略原始事件,只將更新訊息串流給使用者。

import asyncio
import random
from agents import Agent, ItemHelpers, Runner, function_tool

@function_tool
def how_many_jokes() -> int:
    return random.randint(1, 10)


async def main():
    agent = Agent(
        name="Joker",
        instructions="First call the `how_many_jokes` tool, then tell that many jokes.",
        tools=[how_many_jokes],
    )

    result = Runner.run_streamed(
        agent,
        input="Hello",
    )
    print("=== Run starting ===")

    async for event in result.stream_events():
        # We'll ignore the raw responses event deltas
        if event.type == "raw_response_event":
            continue
        # When the agent updates, print that
        elif event.type == "agent_updated_stream_event":
            print(f"Agent updated: {event.new_agent.name}")
            continue
        # When items are generated, print them
        elif event.type == "run_item_stream_event":
            if event.item.type == "tool_call_item":
                print("-- Tool was called")
            elif event.item.type == "tool_call_output_item":
                print(f"-- Tool output: {event.item.output}")
            elif event.item.type == "message_output_item":
                print(f"-- Message output:\n {ItemHelpers.text_message_output(event.item)}")
            else:
                pass  # Ignore other event types

    print("=== Run complete ===")


if __name__ == "__main__":
    asyncio.run(main())