跳到内容

生成器和 LLM 流式传输

延迟至关重要,尤其是在电子商务和像 ChatGPT 这样的新型聊天应用程序中。流式传输是无需更快响应时间即可增强用户体验的解决方案。

那是什么让流式传输成为可能?生成器!

在这篇文章中,我们将深入探讨 Python 生成器的精彩世界——这些工具不仅仅是编码语法技巧。我们将从头开始探索 Python 生成器,然后使用 Instructor 库深入研究 LLM 流式传输。

Python 生成器:迭代器的有效方法

Python 中的生成器是处理大型数据集和流处理的游戏规则改变者。它们允许函数一次生成一个值,暂停和恢复它们的状态,与将所有元素存储在内存中的传统集合相比,这是一种更快、更节省内存的方法。

基础知识:生成值

Python 中的生成器函数使用 yield 关键字。它一次生成一个值,允许函数暂停和恢复其状态。

def count_to_3():
    yield 1
    yield 2
    yield 3


for num in count_to_3():
    print(num)
    #> 1
    #> 2
    #> 3
1
2
3

与传统集合相比的优势

  • 惰性评估 & 降低延迟:从生成器获取第一个元素(或 LLM 中的首个 token 时间)所需的时间显著降低。生成器一次只产生一个值,而访问集合的第一个元素则需要先创建整个集合。
  • 内存效率:一次只有一个项目在内存中。
  • 保持状态:在执行之间自动保持状态。

让我们看看生成器快多少,以及它们真正的亮点在哪里

import time


def expensive_func(x):
    """Simulate an expensive operation."""
    time.sleep(1)
    return x**2


def calculate_time_for_first_result_with_list(func_input, func):
    """Calculate using a list comprehension and return the first result with its computation time."""
    start_perf = time.perf_counter()
    result = [func(x) for x in func_input][0]
    end_perf = time.perf_counter()
    print(f"Time for first result (list): {end_perf - start_perf:.2f} seconds")
    #> Time for first result (list): 5.02 seconds
    return result


def calculate_time_for_first_result_with_generator(func_input, func):
    """Calculate using a generator and return the first result with its computation time."""
    start_perf = time.perf_counter()
    result = next(func(x) for x in func_input)
    end_perf = time.perf_counter()
    print(f"Time for first result (generator): {end_perf - start_perf:.2f} seconds")
    #> Time for first result (generator): 1.00 seconds
    return result


# Prepare inputs for the function
numbers = [1, 2, 3, 4, 5]

# Benchmarking
first_result_list = calculate_time_for_first_result_with_list(numbers, expensive_func)
first_result_gen = calculate_time_for_first_result_with_generator(
    numbers, expensive_func
)
Time for first result (list): 5.02 seconds
Time for first result (generator): 1.01 seconds

生成器计算一次耗时操作并立即返回第一个结果,而列表推导式则在返回第一个结果之前计算列表中所有元素的耗时操作。

生成器表达式:一种快捷方式

Python 还允许在一行代码中创建生成器,称为生成器表达式。它们在语法上类似于列表推导式,但使用括号。

squares = (x * x for x in range(10))

实际应用中的用例

生成器在读取大文件、数据流式传输(例如 LLM token 流式传输)以及为数据处理创建管道等场景中表现出色。

LLM 流式传输

如果你使用过 ChatGPT,你会发现 token 是一个接一个地流式输出的,而不是在最后显示完整的回复(你能想象等待完整回复吗??)。这正是生成器所实现的。

这是一个原生的 OpenAI 生成器示例

from openai import OpenAI

# Set your OpenAI API key
client = OpenAI(
    api_key="My API Key",
)

response_generator = client.chat.completions.create(
    model='gpt-3.5-turbo',
    messages=[{'role': 'user', 'content': "What are some good reasons to smile?"}],
    temperature=0,
    stream=True,
)

for chunk in response_generator:
    print(chunk.choices[0].delta.content, end="")

这很棒,但是如果我们想对这个流进行结构化提取怎么办?例如,我们可能想根据 LLM 流式输出的产品排名来渲染前端组件。

我们应该等到整个流结束再提取和验证组件列表,还是可以在组件流式传输时实时提取和验证?

在电子商务中,每一毫秒都很重要,因此首次渲染时间可以区分一个成功和不太成功的电子商务商店(而且我知道失败的电子商务商店是什么感觉 :/)。

让我们看看如何使用 Instructor 来处理这个实时流中的提取!

电子商务产品排名

场景

想象一个电子商务平台,我们拥有

客户档案:这包括详细的购买历史、浏览行为、产品评分、各种类别的偏好、搜索历史,甚至对之前推荐的反馈。这些丰富的数据对于生成高度个性化和相关的产品建议至关重要。

候选产品列表:这可能是一些我们认为客户会喜欢的入围产品。

我们的目标是重新排序这些候选产品以获得最佳转化率,我们将使用 LLM!

流处理

用户数据:

假设我们有以下用户档案

profile_data = """
Customer ID: 12345
Recent Purchases: [Laptop, Wireless Headphones, Smart Watch]
Frequently Browsed Categories: [Electronics, Books, Fitness Equipment]
Product Ratings: {Laptop: 5 stars, Wireless Headphones: 4 stars}
Recent Search History: [best budget laptops 2023, latest sci-fi books, yoga mats]
Preferred Brands: [Apple, AllBirds, Bench]
Responses to Previous Recommendations: {Philips: Not Interested, Adidas: Not Interested}
Loyalty Program Status: Gold Member
Average Monthly Spend: $500
Preferred Shopping Times: Weekend Evenings
...
"""

我们想为该用户排名以下产品

products = [
    {
        "product_id": 1,
        "product_name": "Apple MacBook Air (2023) - Latest model, high performance, portable",
    },
    {
        "product_id": 2,
        "product_name": "Sony WH-1000XM4 Wireless Headphones - Noise-canceling, long battery life",
    },
    {
        "product_id": 3,
        "product_name": "Apple Watch Series 7 - Advanced fitness tracking, seamless integration with Apple ecosystem",
    },
    {
        "product_id": 4,
        "product_name": "Kindle Oasis - Premium e-reader with adjustable warm light",
    },
    {
        "product_id": 5,
        "product_name": "AllBirds Wool Runners - Comfortable, eco-friendly sneakers",
    },
    {
        "product_id": 6,
        "product_name": "Manduka PRO Yoga Mat - High-quality, durable, eco-friendly",
    },
    {
        "product_id": 7,
        "product_name": "Bench Hooded Jacket - Stylish, durable, suitable for outdoor activities",
    },
    {
        "product_id": 8,
        "product_name": "GoPro HERO9 Black - 5K video, waterproof, for action photography",
    },
    {
        "product_id": 9,
        "product_name": "Nespresso Vertuo Next Coffee Machine - Quality coffee, easy to use, compact design",
    },
    {
        "product_id": 10,
        "product_name": "Project Hail Mary by Andy Weir - Latest sci-fi book from a renowned author",
    },
]

现在让我们定义用于结构化提取的模型。注意:Instructor 可以方便地让我们使用 Iterable 来建模一个可迭代对象。在这种情况下,一旦我们定义了产品推荐模型,我们就可以加上 Iterable 来定义我们最终想要的东西——一个(排好序的)产品推荐列表。

import instructor
from openai import OpenAI
from typing import Iterable
from pydantic import BaseModel

client = instructor.from_openai(OpenAI(), mode=instructor.function_calls.Mode.JSON)


class ProductRecommendation(BaseModel):
    product_id: str
    product_name: str


Recommendations = Iterable[ProductRecommendation]

现在让我们使用我们的 Instructor 补丁。由于我们不想等待所有 token 完成,我们将 stream 设置为 True,并在产品推荐进来时实时处理它们。

prompt = (
    f"Based on the following user profile:\n{profile_data}\nRank the following products from most relevant to least relevant:\n"
    + '\n'.join(
        f"{product['product_id']} {product['product_name']}" for product in products
    )
)

start_perf = time.perf_counter()
recommendations_stream = client.chat.completions.create(
    model="gpt-3.5-turbo-1106",
    temperature=0.1,
    response_model=Iterable[ProductRecommendation],
    stream=True,
    messages=[
        {
            "role": "system",
            "content": "Generate product recommendations based on the customer profile. Return in order of highest recommended first.",
        },
        {"role": "user", "content": prompt},
    ],
)
for product in recommendations_stream:
    print(product)
    end_perf = time.perf_counter()
    print(f"Time for first result (generator): {end_perf - start_perf:.2f} seconds")
    break
product_id='1' product_name='Apple MacBook Air (2023)'
Time for first result (generator): 4.33 seconds

recommendations_stream 是一个生成器!它在实时处理流时生成提取的产品。现在让我们看看没有流式传输的相同响应,并比较它们。

start_perf = time.perf_counter()
recommendations_list = client.chat.completions.create(
    model="gpt-3.5-turbo-1106",
    temperature=0.1,
    response_model=Iterable[ProductRecommendation],
    stream=False,
    messages=[
        {
            "role": "system",
            "content": "Generate product recommendations based on the customer profile. Return in order of highest recommended first.",
        },
        {"role": "user", "content": prompt},
    ],
)
print(recommendations_list[0])
end_perf = time.perf_counter()
print(f"Time for first result (list): {end_perf - start_perf:.2f} seconds")
product_id='1' product_name='Apple MacBook Air (2023)'
Time for first result (list): 8.63 seconds

我们的 Web 应用程序现在显示结果更快。即使提高 100 毫秒,也能带来 1% 的收入增长。

FastAPI

我们还可以利用这一点,使用 FastAPI 设置一个流式 LLM API 端点。请查看我们关于使用 FastAPI 的文档 此处

要点总结

总而言之,我们探讨了

• Python 中的生成器:一个强大的特性,可以有效处理数据并降低延迟

• LLM 流式传输:LLM 为我们提供了生成器来流式传输 token,而 Instructor 可以让我们从这个流中验证和提取数据。实时数据验证太棒了!

不要忘记查看我们的 GitHub 获取更多资源,如果你觉得这个库有帮助,请给我们一个星!


如果您有任何问题或需要进一步澄清,请随时联系我们或深入阅读 Instructor 库的文档以获取更多详细信息。祝您编码愉快!