跳到内容

Logfire 为何能完美契合 FastAPI + Instructor

Logfire 是一款新工具,通过 Open Telemetry 为您的应用程序提供关键洞察。Logfire 可以帮助分析应用程序的各个部分,并且直接集成到 Pydantic 和 FastAPI 中,这两者都是 Instructor 用户中流行的库,而不是使用临时的 print 语句。

简而言之,这是帮助您的应用程序达到并超越目标的秘密武器。我们将通过两个示例向您展示如何轻松地将 Logfire 集成到 FastAPI 中,FastAPI 是 Instructor 用户中最受欢迎的选择之一。

  1. 从单个用户查询中提取数据
  2. 使用 asyncio 并行处理多个用户
  3. 使用 Iterable 流式传输多个对象,使其按需可用

照例,我们在此提及的所有代码都可以在 examples/logfire-fastapi 中找到,供您在项目中使用。

配置 Logfire

在开始本教程之前,请确保您已经注册了 Logfire 账户。您还需要创建一个项目来跟踪这些日志。最后,为了查看请求正文,您还需要将仪表板控制台上的默认日志级别配置为 debug,而不是默认的 info

确保创建一个虚拟环境并安装 examples/logfire-fastapirequirements.txt 文件内的所有软件包。

数据提取

让我们首先尝试从用户查询中提取一些用户信息。我们可以使用如下所示的简单 Pydantic 模型来实现。

from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor


class UserData(BaseModel):
    query: str


class UserDetail(BaseModel):
    name: str
    age: int


app = FastAPI()
client = instructor.from_openai(AsyncOpenAI())


@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
    user_detail = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": f"Extract: `{data.query}`"},
        ],
    )

    return user_detail

这个简单的端点接收一个用户查询,并从语句中提取一个用户。让我们看看如何只需几行代码就能将 Logfire 添加到这个端点中。

from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire  # (1)!


class UserData(BaseModel):
    query: str


class UserDetail(BaseModel):
    name: str
    age: int


app = FastAPI()
openai_client = AsyncOpenAI()  # (2)!
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_openai(openai_client)
logfire.instrument_fastapi(app)
client = instructor.from_openai(openai_client)


@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
    user_detail = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": f"Extract: `{data.query}`"},
        ],
    )

    return user_detail
  1. 导入 logfire 软件包
  2. 使用 Logfire 与 FastAPI 和 OpenAI 的原生集成来设置日志记录

只需这几行代码,我们就完成了与 Logfire 的集成。当我们使用以下载荷调用 /user 端点时,所有内容都会立即记录在控制台中。

curl -X 'POST' \
  'http://localhost:8000/user' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "query": "Daniel is a 24 year man living in New York City"
}'

我们可以看到 Pydantic 在此处为我们很好地记录了我们的 OpenAI 调用的验证结果。就在上方,我们还有 OpenAI 调用的结果。

Pydantic Validation

我们还可以完全查看调用端点时传递给它的参数。这对于用户来说非常有用,特别是当他们最终想在本地重现生产环境中的错误时。

FastAPI arguments

使用 Asyncio

有时,我们可能需要并行运行多个任务。让我们看看如何利用 asyncio 来加速我们的操作。我们可以通过将以下代码添加到我们之前的文件中来实现。

Asyncio 是什么?

要深入了解如何使用 Asycnio,请参阅我们之前的指南此处

import asyncio


class MultipleUserData(BaseModel):
    queries: list[str]


@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
    async def extract_user(query: str):
        user_detail = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            response_model=UserDetail,
            messages=[
                {"role": "user", "content": f"Extract: `{query}`"},
            ],
        )
        logfire.info("/User returning", value=user_detail)
        return user_detail

    coros = [extract_user(query) for query in data.queries]
    return await asyncio.gather(*coros)
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire
import asyncio


class UserData(BaseModel):
    query: str


class MultipleUserData(BaseModel):
    queries: list[str]


class UserDetail(BaseModel):
    name: str
    age: int


app = FastAPI()
openai_client = AsyncOpenAI()
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_openai(openai_client)
logfire.instrument_fastapi(app)
client = instructor.from_openai(openai_client)


@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
    user_detail = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": f"Extract: `{data.query}`"},
        ],
    )
    logfire.info("/User returning", value=user_detail)
    return user_detail


@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
    async def extract_user(query: str):
        user_detail = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            response_model=UserDetail,
            messages=[
                {"role": "user", "content": f"Extract: `{query}`"},
            ],
        )
        logfire.info("/User returning", value=user_detail)
        return user_detail

    coros = [extract_user(query) for query in data.queries]
    return await asyncio.gather(*coros)

我们可以使用简单的 curl 调用此端点

curl -X 'POST' \
  'http://localhost:8000/many-users' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "queries": [
    "Daniel is a 34 year man in New York City","Sarah is a 20 year old living in Tokyo", "Jeffrey is 55 and lives down in Leeds"
  ]
}'

如下所示,所有这些都记录在 Logfire 中。我们可以完全查看整个应用程序的性能,很明显,大部分延迟是由 OpenAI 调用造成的。

我们还可以通过为创建的每个 extract_user 实例创建一个新的 span,将日志分离到更细粒度的级别。

Logfire Asyncio

流式传输

现在让我们看看如何利用 Instructor 的 Iterable 支持来流式传输提取对象的多个实例。这对于速度至关重要且用户希望快速获得结果的应用程序非常有用。

让我们向服务器添加一个新端点,看看这如何实现。

from collections.abc import Iterable
from fastapi.responses import StreamingResponse


class MultipleUserData(BaseModel):
    queries: list[str]


@app.post("/extract", response_class=StreamingResponse)
async def extract(data: UserData):
    suppressed_client = AsyncOpenAI()
    logfire.instrument_openai(
        suppressed_client, suppress_other_instrumentation=False
    )  # (1)!
    client = instructor.from_openai(suppressed_client)
    users = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=Iterable[UserDetail],
        stream=True,
        messages=[
            {"role": "user", "content": data.query},
        ],
    )

    async def generate():
        with logfire.span("Generating User Response Objects"):
            async for user in users:
                resp_json = user.model_dump_json()
                logfire.info("Returning user object", value=resp_json)

                yield resp_json

    return StreamingResponse(generate(), media_type="text/event-stream")
  1. 请注意,我们抑制了 instrumentation 以打印流对象。这与 Instructor 中部分内容的解析有关。
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire
import asyncio
from collections.abc import Iterable
from fastapi.responses import StreamingResponse


class UserData(BaseModel):
    query: str


class MultipleUserData(BaseModel):
    queries: list[str]


class UserDetail(BaseModel):
    name: str
    age: int


app = FastAPI()
openai_client = AsyncOpenAI()
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_fastapi(app)
logfire.instrument_openai(openai_client)
client = instructor.from_openai(openai_client)


@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
    user_detail = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=UserDetail,
        messages=[
            {"role": "user", "content": f"Extract: `{data.query}`"},
        ],
    )
    logfire.info("/User returning", value=user_detail)
    return user_detail


@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
    async def extract_user(query: str):
        user_detail = await client.chat.completions.create(
            model="gpt-3.5-turbo",
            response_model=UserDetail,
            messages=[
                {"role": "user", "content": f"Extract: `{query}`"},
            ],
        )
        logfire.info("/User returning", value=user_detail)
        return user_detail

    coros = [extract_user(query) for query in data.queries]
    return await asyncio.gather(*coros)


@app.post("/extract", response_class=StreamingResponse)
async def extract(data: UserData):
    suppressed_client = AsyncOpenAI()
    logfire.instrument_openai(suppressed_client, suppress_other_instrumentation=False)
    client = instructor.from_openai(suppressed_client)
    users = await client.chat.completions.create(
        model="gpt-3.5-turbo",
        response_model=Iterable[UserDetail],
        stream=True,
        messages=[
            {"role": "user", "content": data.query},
        ],
    )

    async def generate():
        with logfire.span("Generating User Response Objects"):
            async for user in users:
                resp_json = user.model_dump_json()
                logfire.info("Returning user object", value=resp_json)

                yield resp_json

    return StreamingResponse(generate(), media_type="text/event-stream")

我们可以使用 requests 库及其 iter_content 方法来调用并记录返回的流。

import requests

response = requests.post(
    "http://127.0.0.1:3000/extract",
    json={
        "query": "Alice and Bob are best friends. They are currently 32 and 43 respectively. "
    },
    stream=True,
)

for chunk in response.iter_content(chunk_size=1024):
    if chunk:
        print(str(chunk, encoding="utf-8"), end="\n")

这将给出以下输出

{"name":"Alice","age":32}
{"name":"Bob","age":43}

我们还可以在 Logfire 仪表板中看到单个流对象,如下所示。请注意,为了便于记录,我们将生成的日志分组到其自身的 span 内。

Logfire Stream