跳到内容

流式基础

流式允许你在结构化响应生成时接收其部分内容,而无需等待完整的响应。

为何使用流式?

流式提供以下几项优势

  1. 感知速度更快:用户立即看到结果
  2. 渐进式 UI 更新:数据到达时更新界面
  3. 生成时处理:在完整响应就绪前开始使用数据
Without Streaming:
┌─────────┐             ┌─────────────────────┐
│ Request │─── Wait ───>│ Complete Response   │
└─────────┘             └─────────────────────┘

With Streaming:
┌─────────┐    ┌───────┐    ┌───────┐    ┌───────┐
│ Request │───>│Part 1 │───>│Part 2 │───>│Part 3 │─── ...
└─────────┘    └───────┘    └───────┘    └───────┘

简单示例

以下是如何流式传输结构化响应

import instructor
from openai import OpenAI
from pydantic import BaseModel

# Define your data structure
class UserProfile(BaseModel):
    name: str
    bio: str
    interests: list[str]

# Set up client
client = instructor.from_openai(OpenAI())

# Enable streaming
for partial in client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "user", "content": "Generate a profile for Alex Chen"}
    ],
    response_model=UserProfile,
    stream=True  # This enables streaming
):
    # Print each update as it arrives
    print("\nUpdate received:")

    # Access available fields
    if hasattr(partial, "name") and partial.name:
        print(f"Name: {partial.name}")
    if hasattr(partial, "bio") and partial.bio:
        print(f"Bio: {partial.bio[:30]}...")
    if hasattr(partial, "interests") and partial.interests:
        print(f"Interests: {', '.join(partial.interests)}")

流式如何工作

使用 Instructor 进行流式传输时

  1. 通过 stream=True 启用流式传输
  2. 该方法返回部分响应的迭代器
  3. 每个部分包含到目前为止已完成的字段
  4. 你使用 hasattr() 检查字段,因为它们是增量出现的
  5. 最终迭代包含完整的响应

进度追踪示例

以下是追踪进度的简单方法

import instructor
from openai import OpenAI
from pydantic import BaseModel

client = instructor.from_openai(OpenAI())

class Report(BaseModel):
    title: str
    summary: str
    conclusion: str

# Track completed fields
completed = set()
total_fields = 3  # Number of fields in our model

for partial in client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {"role": "user", "content": "Generate a report on climate change"}
    ],
    response_model=Report,
    stream=True
):
    # Check which fields are complete
    for field in ["title", "summary", "conclusion"]:
        if hasattr(partial, field) and getattr(partial, field) and field not in completed:
            completed.add(field)
            percent = (len(completed) / total_fields) * 100
            print(f"Received: {field} - {percent:.0f}% complete")

下一步