Logfire 为何能完美契合 FastAPI + Instructor¶
Logfire 是一款新工具,通过 Open Telemetry 为您的应用程序提供关键洞察。Logfire 可以帮助分析应用程序的各个部分,并且直接集成到 Pydantic 和 FastAPI 中,这两者都是 Instructor 用户中流行的库,而不是使用临时的 print 语句。
简而言之,这是帮助您的应用程序达到并超越目标的秘密武器。我们将通过两个示例向您展示如何轻松地将 Logfire 集成到 FastAPI 中,FastAPI 是 Instructor 用户中最受欢迎的选择之一。
- 从单个用户查询中提取数据
- 使用
asyncio
并行处理多个用户 - 使用
Iterable
流式传输多个对象,使其按需可用
照例,我们在此提及的所有代码都可以在 examples/logfire-fastapi 中找到,供您在项目中使用。
配置 Logfire
在开始本教程之前,请确保您已经注册了 Logfire 账户。您还需要创建一个项目来跟踪这些日志。最后,为了查看请求正文,您还需要将仪表板控制台上的默认日志级别配置为 debug
,而不是默认的 info
。
确保创建一个虚拟环境并安装 examples/logfire-fastapi 中 requirements.txt
文件内的所有软件包。
数据提取¶
让我们首先尝试从用户查询中提取一些用户信息。我们可以使用如下所示的简单 Pydantic 模型来实现。
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
class UserData(BaseModel):
query: str
class UserDetail(BaseModel):
name: str
age: int
app = FastAPI()
client = instructor.from_openai(AsyncOpenAI())
@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{data.query}`"},
],
)
return user_detail
这个简单的端点接收一个用户查询,并从语句中提取一个用户。让我们看看如何只需几行代码就能将 Logfire 添加到这个端点中。
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire # (1)!
class UserData(BaseModel):
query: str
class UserDetail(BaseModel):
name: str
age: int
app = FastAPI()
openai_client = AsyncOpenAI() # (2)!
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_openai(openai_client)
logfire.instrument_fastapi(app)
client = instructor.from_openai(openai_client)
@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{data.query}`"},
],
)
return user_detail
- 导入 logfire 软件包
- 使用 Logfire 与 FastAPI 和 OpenAI 的原生集成来设置日志记录
只需这几行代码,我们就完成了与 Logfire 的集成。当我们使用以下载荷调用 /user
端点时,所有内容都会立即记录在控制台中。
curl -X 'POST' \
'http://localhost:8000/user' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"query": "Daniel is a 24 year man living in New York City"
}'
我们可以看到 Pydantic 在此处为我们很好地记录了我们的 OpenAI 调用的验证结果。就在上方,我们还有 OpenAI 调用的结果。
我们还可以完全查看调用端点时传递给它的参数。这对于用户来说非常有用,特别是当他们最终想在本地重现生产环境中的错误时。
使用 Asyncio¶
有时,我们可能需要并行运行多个任务。让我们看看如何利用 asyncio
来加速我们的操作。我们可以通过将以下代码添加到我们之前的文件中来实现。
Asyncio 是什么?
要深入了解如何使用 Asycnio,请参阅我们之前的指南此处。
import asyncio
class MultipleUserData(BaseModel):
queries: list[str]
@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
async def extract_user(query: str):
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{query}`"},
],
)
logfire.info("/User returning", value=user_detail)
return user_detail
coros = [extract_user(query) for query in data.queries]
return await asyncio.gather(*coros)
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire
import asyncio
class UserData(BaseModel):
query: str
class MultipleUserData(BaseModel):
queries: list[str]
class UserDetail(BaseModel):
name: str
age: int
app = FastAPI()
openai_client = AsyncOpenAI()
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_openai(openai_client)
logfire.instrument_fastapi(app)
client = instructor.from_openai(openai_client)
@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{data.query}`"},
],
)
logfire.info("/User returning", value=user_detail)
return user_detail
@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
async def extract_user(query: str):
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{query}`"},
],
)
logfire.info("/User returning", value=user_detail)
return user_detail
coros = [extract_user(query) for query in data.queries]
return await asyncio.gather(*coros)
我们可以使用简单的 curl
调用此端点
curl -X 'POST' \
'http://localhost:8000/many-users' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"queries": [
"Daniel is a 34 year man in New York City","Sarah is a 20 year old living in Tokyo", "Jeffrey is 55 and lives down in Leeds"
]
}'
如下所示,所有这些都记录在 Logfire 中。我们可以完全查看整个应用程序的性能,很明显,大部分延迟是由 OpenAI 调用造成的。
我们还可以通过为创建的每个 extract_user
实例创建一个新的 span,将日志分离到更细粒度的级别。
流式传输¶
现在让我们看看如何利用 Instructor 的 Iterable
支持来流式传输提取对象的多个实例。这对于速度至关重要且用户希望快速获得结果的应用程序非常有用。
让我们向服务器添加一个新端点,看看这如何实现。
from collections.abc import Iterable
from fastapi.responses import StreamingResponse
class MultipleUserData(BaseModel):
queries: list[str]
@app.post("/extract", response_class=StreamingResponse)
async def extract(data: UserData):
suppressed_client = AsyncOpenAI()
logfire.instrument_openai(
suppressed_client, suppress_other_instrumentation=False
) # (1)!
client = instructor.from_openai(suppressed_client)
users = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=Iterable[UserDetail],
stream=True,
messages=[
{"role": "user", "content": data.query},
],
)
async def generate():
with logfire.span("Generating User Response Objects"):
async for user in users:
resp_json = user.model_dump_json()
logfire.info("Returning user object", value=resp_json)
yield resp_json
return StreamingResponse(generate(), media_type="text/event-stream")
- 请注意,我们抑制了 instrumentation 以打印流对象。这与 Instructor 中部分内容的解析有关。
from pydantic import BaseModel
from fastapi import FastAPI
from openai import AsyncOpenAI
import instructor
import logfire
import asyncio
from collections.abc import Iterable
from fastapi.responses import StreamingResponse
class UserData(BaseModel):
query: str
class MultipleUserData(BaseModel):
queries: list[str]
class UserDetail(BaseModel):
name: str
age: int
app = FastAPI()
openai_client = AsyncOpenAI()
logfire.configure(pydantic_plugin=logfire.PydanticPlugin(record="all"))
logfire.instrument_fastapi(app)
logfire.instrument_openai(openai_client)
client = instructor.from_openai(openai_client)
@app.post("/user", response_model=UserDetail)
async def endpoint_function(data: UserData) -> UserDetail:
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{data.query}`"},
],
)
logfire.info("/User returning", value=user_detail)
return user_detail
@app.post("/many-users", response_model=list[UserDetail])
async def extract_many_users(data: MultipleUserData):
async def extract_user(query: str):
user_detail = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=UserDetail,
messages=[
{"role": "user", "content": f"Extract: `{query}`"},
],
)
logfire.info("/User returning", value=user_detail)
return user_detail
coros = [extract_user(query) for query in data.queries]
return await asyncio.gather(*coros)
@app.post("/extract", response_class=StreamingResponse)
async def extract(data: UserData):
suppressed_client = AsyncOpenAI()
logfire.instrument_openai(suppressed_client, suppress_other_instrumentation=False)
client = instructor.from_openai(suppressed_client)
users = await client.chat.completions.create(
model="gpt-3.5-turbo",
response_model=Iterable[UserDetail],
stream=True,
messages=[
{"role": "user", "content": data.query},
],
)
async def generate():
with logfire.span("Generating User Response Objects"):
async for user in users:
resp_json = user.model_dump_json()
logfire.info("Returning user object", value=resp_json)
yield resp_json
return StreamingResponse(generate(), media_type="text/event-stream")
我们可以使用 requests
库及其 iter_content
方法来调用并记录返回的流。
import requests
response = requests.post(
"http://127.0.0.1:3000/extract",
json={
"query": "Alice and Bob are best friends. They are currently 32 and 43 respectively. "
},
stream=True,
)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
print(str(chunk, encoding="utf-8"), end="\n")
这将给出以下输出
我们还可以在 Logfire 仪表板中看到单个流对象,如下所示。请注意,为了便于记录,我们将生成的日志分组到其自身的 span 内。