FastAPI 备忘清单

一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示

入门

安装 FastAPI

$ pip install "fastapi[all]"

可以分开来安装

假如你想将应用程序部署到生产环境,你可能要执行以下操作:

$ pip install fastapi

并且安装 uvicorn 来作为服务器:

$ pip install "uvicorn[standard]"

运行代码

$ uvicorn main:app --reload

Python: 3.9.5 FastAPI: 0.103.1

最小程序

下面代码会直接启动http服务,也可以使用 uvicorn main:app --reload

from fastapi import FastAPI
import uvicorn

app = FastAPI()

添加一个 API 的示例

# http://127.0.0.1:8000/
@app.get("/")
async def root():
    return {"message": "Hello World"}

if __name__ == '__main__':
    uvicorn.run(app='main:app', reload=True)

路径参数

最基本的路径参数

# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id):
    return {"item_id": item_id} # item_id自定义

多个路径参数

# http://127.0.0.1:8000/items/1/2
@app.get("/items/{item_id}/{user_id}")
async def read_item(item_id, user_id):
    return {"item_id": item_id, "user_id": user_id}

有类型的路径参数

# http://127.0.0.1:8000/items/1
@app.get("/items/{item_id}")
async def read_item(item_id: int):
    return {"item_id": item_id}

文件路径参数

# http://127.0.0.1:8000/file//home/my/my.txt
@app.get("/file/{file_path:path}")
async def read_item(file_path):
    return {"file_path": file_path}

查询参数

带默认值的查询参数

# http://127.0.0.1:8000/items/?skip=0&limit=2
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]

@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
    return fake_items_db[skip: skip + limit]

可选查询参数

# http://127.0.0.1:8000/items/1?q=admin
from typing import Union

@app.get("/items/{item_id}")
async def read_item(item_id: str, q: Union[str, None] = None):
    if q:
        return {"item_id": item_id, "q": q}
    return {"item_id": item_id}

多路径多查询参数

# http://127.0.0.1:8000/users/1/items/2
# or 
# http://127.0.0.1:8000/users/1/items/2?q=query&short=true
@app.get("/users/{user_id}/items/{item_id}")
async def read_user_item(
    user_id: int, 
    item_id: str, 
    q: Union[str, None] = None, 
    short: bool = False
):
    item = {"item_id": item_id, "owner_id": user_id}
    if q:
        item.update({"q": q})
    if not short:
        item.update(
            {"description": "这是一个令人惊叹的项目,有很长的描述"}
        )
    return item

必需查询参数

# http://127.0.0.1:8000/items/123?needy=yes
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
    item = {"item_id": item_id, "needy": needy}
    return item

请求体

from pydantic import BaseModel
from typing import Union

class Item(BaseModel):
    name: str = '小明'
    description: Union[str, None] = None
    price: float
    tax: Union[float, None] = None

@app.post("/items/")
async def create_item(item: Item):
    print(item.name)
    return item

调用

curl -X 'POST' \
  'http://127.0.0.1:8000/items/' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/json' \
  -d '{
  "name": "小明",
  "description": "string",
  "price": 0,
  "tax": 0
}'

查询参数和字符串校验

from fastapi import Query

@app.get("/items/")
async def read_items(
    q: Union[str, None] = Query(default=None, max_length=50)
):
    results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
    if q:
        results.update({"q": q})
    return results

参数列表

参数含义类型
default默认值任意类型或...
max_length最大长度int
min_length最小长度int
pattern正则匹配string
alias别名参数string
deprecated准备弃用参数bool

多个相同的查询参数

# http://127.0.0.1:8000/items/?q=foo&q=bar
@app.get("/items/")
async def read_items(
    q: Union[List[str], None] = Query(default=None)
):
    query_items = {"q": q}
    return query_items

路径参数和数值校验

Path 用法基本和 Query 相同,参考:FastAPI官方文档

导入 Path

from fastapi import FastAPI, Path, Query
from typing_extensions import Annotated

声明元数据

@app.get("/items/{item_id}")
async def read_items(
    item_id: Annotated[int, Path(title="要获取的项目的 ID")],
    q: Annotated[str | None, Query(alias="item-query")] = None,
):
    results = {"item_id": item_id}
    if q:
        results.update({"q": q})
    return results

参数列表

参数含义类型
...和 Query 具有相同参数...
ge大于等于int float
gt大于int float
le小于等于int float
le小于等于int float
titleapi文档的标题string

其他参数

都具有 Query 的参数,max_lengthmin_length

Cookie参数

from fastapi import Cookie

@app.get("/items/")
async def read_items(
    ads_id: Annotated[Union[str, None], Cookie()] = None
):
    return {"ads_id": ads_id}

Header 参数

from fastapi import Header

@app.get("/items/")
async def read_items(
    user_agent: Annotated[Union[str, None], Header()] = None,
    items_id: Annotated[Union[int, None], Header(ge=1)] = None
):
    return {"User-Agent": user_agent, "items_id": items_id}

表单数据

接收的不是 JSON,而是表单字段时,要使用 Form。

安装

$ pip install python-multipart

HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
</head>
<body>
<form method="post" action="http://127.0.0.1:8000/login">
    <span>账号:</span><input type="text" name="username">
    <br>
    <span>密码:</span><input type="password" name="password">
    <br>
    <input type="submit" value="登录">
</form>
</body>
</html>

FastAPI

from fastapi import FastAPI, Form
import uvicorn
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
    return {"username": username}
if __name__ == '__main__':
    uvicorn.run(app='main:app', reload=True)

文件上传

from fastapi import FastAPI, UploadFile
from fastapi.responses import HTMLResponse

@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile):
    print(file.file.read().decode())
    return {"filenames": file.filename, "type": str(type(file.file))}

@app.get("/")
async def main():
    content = """<body>
<form action="/uploadfile/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input type="submit">
</form>
</body>"""
    return HTMLResponse(content=content)

UploadFile 属性

属性名含义返回
filename文件名上传的文件名
content_type内容类型MIME 类型
file文件SpooledTemporaryFile 具有 readwrite 方法

UploadFile async 方法

方法名含义
write(data)data 写入文件
read(size)按指定数量的字节读取文件内容
seek(offset)移动至文件 offsetint)字节处的位置
close()关闭文件

依赖项

依赖项使用场景

  • 共享业务逻辑(复用相同的代码逻辑)
  • 共享数据库连接
  • 实现安全、验证、角色权限
  • 等……

创建依赖项

from typing import Union

from fastapi import Depends, FastAPI

app = FastAPI()

read_itemsread_users 方法依赖 common_parameters 白话就是 read_itemsread_users 都需要 qskiplimit 查询参数

async def common_parameters(
    q: Union[str, None] = None,
    skip: int = 0,
    limit: int = 100
):
    return {"q": q, "skip": skip, "limit": limit}


@app.get("/items/")
async def read_items(
    commons: dict = Depends(common_parameters)
):
    return commons


@app.get("/users/")
async def read_users(
    commons: dict = Depends(common_parameters)
):
    return commons

类作为依赖项

from typing import Union
from fastapi import Depends, FastAPI

app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}]

class CommonQueryParams:
    def __init__(
        self, 
        q: Union[str, None] = None, 
        skip: int = 0, 
        limit: int = 100
    ):
        self.q = q
        self.skip = skip
        self.limit = limit

read_itemsx 接收一个 commons 参数,类型是 CommonQueryParams CommonQueryParams 接收三个参数,这三个参数是调用 api 的时候传

@app.get("/items/")
async def read_items(
  commons: CommonQueryParams = Depends(CommonQueryParams)
):
  response = {}
  if commons.q:
      response.update({"q": commons.q})
  items = fake_items_db[commons.skip : commons.skip + commons.limit]
  response.update({"items": items})
  return response

还可以简写

@app.get("/items/")
async def read_items(
  # 这里的 Depends 没有传参,FastAPI 会自动使用 CommonQueryParams
  commons: CommonQueryParams = Depends()
):
  response = {}
  if commons.q:
      response.update({"q": commons.q})
  items = fake_items_db[commons.skip : commons.skip + commons.limit]
  response.update({"items": items})
  return response

子依赖项

from typing import Union
from fastapi import Cookie, Depends, FastAPI

app = FastAPI()

def query_extractor(q: Union[str, None] = None):
    return q

def query_or_cookie_extractor(
    q: str = Depends(query_extractor),
    last_query: Union[str, None] = Cookie(default=None),
):
    if not q:
        return last_query
    return q

# read_query函数依赖query_or_cookie_extractor函数
# query_or_cookie_extractor函数又依赖query_extractor函数
# 就是说依赖项可以依赖其他依赖项,只要你不晕,可以无数次套娃
@app.get("/items/")
async def read_query(
  query_or_default: str = Depends(query_or_cookie_extractor)
):
    return {"q_or_cookie": query_or_default}

不使用缓存

使用 use_cache = False 参数不使用缓存数据,不使用 use_cache = Falsevaluevalue1 是一样的

def result_value():
    value = randint(1, 99)
    return value

def get_value(
  value: int = Depends(result_value, use_cache=False),
  value1: int = Depends(result_value, use_cache=False)
):
    return value, value1

@app.get('/value/')
async def needy_dependency(value: tuple = Depends(get_value)):
    return {"value": value}

全局依赖项

from fastapi import Depends, FastAPI, Header, HTTPException

async def verify_token(x_token: str = Header()):
  if x_token != "fake-super-secret-token":
    raise HTTPException(status_code=400, detail="X-Token 标头无效")

async def verify_key(x_key: str = Header()):
  if x_key != "fake-super-secret-key":
    raise HTTPException(status_code=400, detail="X-Key 标头无效")
  return x_key

全局依赖项很有用,后面的安全性就可以使用全局依赖项

app = FastAPI(
  dependencies=[Depends(verify_token), Depends(verify_key)]
)

@app.get("/items/")
async def read_items():
    return [{"item": "Portal Gun"}, {"item": "Plumbus"}]

@app.get("/users/")
async def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}]

安全性

基于 Token 的认证

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

使用 OAuth2PasswordBearer 创建一个 token 依赖

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

假设这是你的用户数据库

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    }
}

创建一个用户模型

class User(BaseModel):
    username: str
    email: str
    full_name: str
    disabled: bool

创建一个简单的认证函数

def fake_hash_password(password: str):
    return "fakehashed" + password

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return User(**user_dict)

def fake_decode_token(token: str):
    # 这个函数应该验证 token 并返回用户信息
    # 这里我们只是简单地返回了用户名
    return get_user(fake_users_db, token)

创建一个依赖,用于从请求中获取 token 并验证用户

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=401,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = get_user(fake_users_db, form_data.username)
    if not user or user.hashed_password != fake_hash_password(form_data.password):
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

使用 OAuth2PasswordBearer 来创建一个简单的 token 认证流程。

HTTPS 和证书

from fastapi import FastAPI

app = FastAPI()

在生产环境中,你应该使用一个真正的证书和私钥,你可以从像 Let's Encrypt 这样的证书颁发机构获得免费的证书,或者使用 OpenSSL 生成自签名证书

@app.get("/https")
async def read_https():
    return {"message": "Hello, HTTPS!"}

启动服务器时,使用以下命令来指定证书和私钥:

uvicorn main:app --host 0.0.0.0 --port 443 --ssl-keyfile /path/to/your/key.pem --ssl-certfile /path/to/your/cert.pem

FastAPI 默认支持 HTTPS,你只需要提供证书和私钥即可。

待更新

参考