跳到内容

使用 asyncioInstructor 在 Python 中进行 OpenAI 异步处理

今天,我将向您介绍在 Python 中使用 asyncio 的各种方法。我们将把这些方法应用于使用 instructor 批量处理数据,并学习如何使用 asyncio.gatherasyncio.as_completed 进行并发数据处理。此外,我们将探讨如何使用 asyncio.Semaphore 限制对服务器的并发请求数量。

Github 示例

如果您想运行本文中的代码示例,可以在 jxnl/instructor 上找到它们。

我们将首先定义一个调用 openai 提取数据的 async 函数,然后探讨执行它的四种不同方法。我们将讨论每种方法的优缺点,并分析在小批量数据集上运行它们的结果。

理解 asyncio

asyncio 是一个 Python 库,它使用 async/await 语法实现并发代码。它特别适用于 IO 密集型和结构化网络代码。如果您熟悉 OpenAI 的 SDK,您可能遇到过两个类:OpenAI()AsyncOpenAI()。今天,我们将使用 AsyncOpenAI() 类,它异步处理数据。

通过在 Web 应用程序或批量处理中利用这些工具,我们可以通过并发处理多个请求而非顺序处理来显著提高性能。

理解 asyncawait

我们将使用 asyncawait 关键字定义异步函数。async 关键字用于定义一个返回协程对象的函数。await 关键字用于等待协程对象的结果。

如果您想深入了解 asyncio 的细节,我建议阅读 Real Python 的这篇文章

理解 gather vs as_completed

在本文中,我们将展示两种并发运行任务的方法:asyncio.gatherasyncio.as_completedgather 方法用于并发运行多个任务并将结果作为 list 返回。as_completed 返回一个 iterable,用于并发运行多个任务并在任务完成时返回结果。关于两者区别的另一个很好的资源可以在这里找到。

示例:批量处理

在这个示例中,我们将演示如何使用 asyncio 进行异步处理任务,特别是用于并发提取和处理数据。该脚本将从文本列表中提取数据,并使用 asyncio 并发处理它们。

import instructor
from pydantic import BaseModel
from openai import AsyncOpenAI

# Enables `response_model` in `create` method
client = instructor.apatch(AsyncOpenAI())  # (1)!


class Person(BaseModel):
    name: str
    age: int


async def extract_person(text: str) -> Person:
    return await client.chat.completions.create(  # (2)!
        model="gpt-3.5-turbo",
        messages=[
            {"role": "user", "content": text},
        ],
        response_model=Person,
    )
  1. 我们使用 instructor.apatch 来修补 AsyncOpenAIcreate 方法,使其接受 response_model 参数。这是因为如果没有此补丁,AsyncOpenAIcreate 方法不接受 response_model 参数。
  2. 我们在这里使用 await 来等待服务器的响应,然后再返回结果。这是因为 create 返回的是一个协程对象,而不是协程的结果。

注意,现在函数定义中有 asyncawait 关键字。这是因为我们使用 asyncio 库来并发运行函数。现在,让我们定义一批要处理的文本。

dataset = [
    "My name is John and I am 20 years old",
    "My name is Mary and I am 21 years old",
    "My name is Bob and I am 22 years old",
    "My name is Alice and I am 23 years old",
    "My name is Jane and I am 24 years old",
    "My name is Joe and I am 25 years old",
    "My name is Jill and I am 26 years old",
]

for loop:顺序执行任务。

persons = []
for text in dataset:
    person = await extract_person(text)
    persons.append(person)

即使有 await 关键字,我们仍然必须等待每个任务完成后才能开始下一个。这是因为我们使用 for 循环遍历数据集。这种使用 for 循环的方法将是今天讨论的四种方法中最慢的。

asyncio.gather:并发执行任务。

async def gather():
    tasks_get_persons = [extract_person(text) for text in dataset]
    all_persons = await asyncio.gather(*tasks_get_persons)  # (1)!
  1. 我们在这里使用 await 来等待所有任务完成后,再将结果赋给 all_persons。这是因为 asyncio.gather 返回的是一个协程对象,而不是协程的结果。或者,我们可以使用 asyncio.as_completed 来达到同样的效果。

使用 asyncio.gather 允许我们一次性返回所有结果。这是加速代码的有效方法,但这并非唯一的方法。特别是,如果数据集很大,我们可能不希望等待所有任务完成后才开始处理结果。这时就轮到 asyncio.as_completed 发挥作用了。

asyncio.as_completed:处理已完成的任务。

async def as_completed():
    all_persons = []
    tasks_get_persons = [extract_person(text) for text in dataset]
    for person in asyncio.as_completed(tasks_get_persons):
        all_persons.append(await person)  # (1)!
  1. 我们在这里使用 await 来等待每个任务完成后,再将其添加到列表中。这是因为 as_completed 返回的是一个协程对象,而不是协程的结果。或者,我们可以使用 asyncio.gather 来达到同样的效果。

这种方法是处理大型数据集的好方法。我们可以随着结果的到来开始处理,特别是在我们将数据流回客户端的情况下。

然而,这些方法旨在尽可能快地完成尽可能多的任务。如果我们要考虑对我们发送请求的服务器保持友好,这可能会有问题。这时就需要限速。虽然有可用的库来协助限速,但作为我们的初步防御,我们将使用信号量来限制我们发起的并发请求数量。

结果的顺序

需要注意的是,结果的顺序将与数据集的原始顺序不同。这是因为任务按照完成的顺序返回,而不是按照开始的顺序。如果您需要保留结果的顺序,可以改用 asyncio.gather

限速 Gather:使用信号量限制并发。

sem = asyncio.Semaphore(2)


async def rate_limited_extract_person(text: str, sem: Semaphore) -> Person:
    async with sem:  # (1)!
        return await extract_person(text)


async def rate_limited_gather(sem: Semaphore):
    tasks_get_persons = [rate_limited_extract_person(text, sem) for text in dataset]
    resp = await asyncio.gather(*tasks_get_persons)
  1. 我们使用信号量将并发请求数量限制为 2。这种方法在速度和对我们发送请求的服务器保持友好之间取得了平衡。

限速 As Completed:使用信号量限制并发。

sem = asyncio.Semaphore(2)


async def rate_limited_extract_person(text: str, sem: Semaphore) -> Person:
    async with sem:  # (1)!
        return await extract_person(text)


async def rate_limited_as_completed(sem: Semaphore):
    all_persons = []
    tasks_get_persons = [rate_limited_extract_person(text, sem) for text in dataset]
    for person in asyncio.as_completed(tasks_get_persons):
        all_persons.append(await person)  # (2)!
  1. 我们使用信号量将并发请求数量限制为 2。这种方法在速度和对我们发送请求的服务器保持友好之间取得了平衡。

  2. 我们在这里使用 await 来等待每个任务完成后,再将其添加到列表中。这是因为 as_completed 返回的是一个协程对象,而不是协程的结果。或者,我们可以使用 asyncio.gather 来达到同样的效果。

现在我们已经看到了代码,让我们来看看处理 7 段文本的结果。随着提示变长或我们使用 GPT-4,这些方法之间的差异将变得更加明显。

其他选项

同样重要的是要注意,在这里我们使用 semaphore 来限制并发请求的数量。然而,还有其他方法可以限制并发,特别是考虑到我们从 openai 请求中获取了速率限制信息。您可以想象使用像 ratelimit 这样的库来限制每秒请求的数量。或者捕获速率限制异常并使用 tenacity 在一段时间后重试请求。

结果

如您所见,for 循环是最慢的,而 asyncio.as_completedasyncio.gather 在没有任何速率限制的情况下是最快的。

方法 执行时间 限速 (信号量)
For 循环 6.17 秒
Asyncio.gather 0.85 秒
Asyncio.as_completed 0.95 秒
Asyncio.gather 3.04 秒 2
Asyncio.as_completed 3.26 秒 2

异步处理的实际意义

方法的选择取决于任务的性质以及速度和资源利用之间的预期平衡。

以下是一些建议考量的指导方针

  • 使用 asyncio.gather 快速处理多个独立任务。
  • 对于大型数据集,应用 asyncio.as_completed 以在任务完成时进行处理。
  • 实现速率限制,以避免服务器或 API 端点过载。

如果您觉得内容有帮助或想尝试 Instructor,请访问我们的 GitHub 页面并给我们点个星!