FastAPI:(9)安全性


FastAPI:(9)安全性

概述

文章内容地图

graph TD
    A[FastAPI安全] --> B[OAuth2]
    A --> C[JWT]
    A --> D[密码哈希]
    
    B --> E[密码模式]
    B --> F[Bearer验证]
    
    C --> G[标准字段]
    C --> H[签名/过期]
    
    D --> I[Bcrypt加密]
    
    E --> J[获取token]
    F --> K[验证token]
    G --> L[用户标识]
    H --> M[防篡改]
    I --> N[安全存储]

1.基础概念

OAuth2授权协议

OAuth2 是一种授权协议,用于允许用户将其资源(如账户信息)授权给第三方应用访问而无需暴露密码。在 FastAPI 中,OAuth2 被作为一种标准机制来实现安全的用户认证,主要通过令牌(token)来管理访问控制。

FastAPI 提供 OAuth2PasswordBearerOAuth2PasswordRequestForm 等工具来支持 OAuth2 的“密码模式(Password Flow)”,这适用于拥有自己用户数据库的小型应用。

重要特征

  • 特征1:基于令牌(Token)的授权方式,避免密码直接暴露
  • 特征2:身份验证与授权解耦,通过 access token 授权访问
  • 特征3:支持与用户认证系统对接(如数据库、社交登录等)
  • 特征4:需要定义 token endpoint 和依赖项验证用户身份
  • 特征5:支持范围(scopes)、过期时间、刷新机制(高级用法)
  • 特征6:FastAPI 中通常用于构建标准安全 API 认证机制

教育平台中的OAuth2(正例)

OAuth2授权协议

例子描述
一个在线教育平台为不同学生账户提供课程访问权限。平台通过 FastAPI 实现 OAuth2 密码模式认证:学生使用用户名和密码登录系统后,系统颁发 token,后续所有 API 调用均需携带该 token 验证用户身份和权限。

特征对比

  • ✅ 使用 OAuth2PasswordBearer 定义 token 路径
  • ✅ 用户登录返回 access token
  • ✅ 后续路由依赖 token 验证权限
  • ✅ 用户与权限体系解耦,结构清晰
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

fake_users = {"alice": {"username": "alice", "password": "secret", "token": "abc123"}}

def fake_decode_token(token: str):
    for user in fake_users.values():
        if user["token"] == token:
            return user
    return None

@app.post("/token")
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = fake_users.get(form_data.username)
    if not user or user["password"] != form_data.password:
        raise HTTPException(status_code=400, detail="Invalid credentials")
    return {"access_token": user["token"], "token_type": "bearer"}

@app.get("/courses/")
def read_courses(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(status_code=401, detail="Invalid token")
    return {"user": user["username"], "courses": ["Math", "Science"]}

密码流(正例)

OAuth2授权协议

from typing import Annotated

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


@app.get("/items/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):
    return {"token": token}

Password 是 OAuth2 定义的,用于处理安全与身份验证的方式()。OAuth2 的设计目标是为了让后端或 API 独立于服务器验证用户身份。但在本例中,FastAPI 应用会处理 API 与身份验证。
创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username 与 password,并获取令牌。

代码运行流程:

  • 用户在前端输入 username 与password,并点击回车
  • (用户浏览器中运行的)前端把 username 与password 发送至 API 中指定的 URL(使用 tokenUrl="token" 声明)
  • API 检查 username 与password,并用令牌(Token) 响应(暂未实现此功能):
  • 令牌只是用于验证用户的字符串
  • 一般来说,令牌会在一段时间后过期
    • 过时后,用户要再次登录
    • 这样一来,就算令牌被人窃取,风险也较低。因为它与永久密钥不同,在绝大多数情况下不会长期有效
  • 前端临时将令牌存储在某个位置
  • 用户点击前端,前往前端应用的其它部件
  • 前端需要从 API 中提取更多数据:
    • 为指定的端点(Endpoint)进行身份验证
    • 因此,用 API 验证身份时,要发送值为 Bearer + 令牌的请求头 Authorization
    • 假如令牌为 foobarAuthorization 请求头就是: Bearer foobar

获取当前用户(正例)

OAuth2授权协议

from typing import Union

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel): # 创建Pydantic的用户模型
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


def fake_decode_token(token):
    return User(
        username=token + "fakedecoded", email="john@example.com", full_name="John Doe"
    )


async def get_current_user(token: str =  Depends(oauth2_scheme)):  # 创建依赖项,使用oauth2_scheme作为依赖项
    user = fake_decode_token(token) # 使用(伪)工具函数
    return user # 返回Pydantic的User模型


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_user)): # 注入当前用户
    return current_user

2.OAuth2 实现简单的 Password 和 Bearer 验证

password与bearer验证

OAuth2 中的 “password 和 bearer 验证” 是 FastAPI 支持的一种简化认证机制,用于让客户端应用通过用户名和密码请求 token,并使用该 token(作为 Bearer Token)访问受保护的资源。该机制不涉及授权服务器或复杂的 OAuth2 流程,适用于小型应用或内建用户系统的服务。

重要特征

  • 特征1:使用 OAuth2PasswordRequestForm 获取用户名和密码
  • 特征2:使用 OAuth2PasswordBearer 定义 token 接收机制
  • 特征3:客户端通过 /token 获取 token,后续请求使用 Authorization: Bearer <token>
  • 特征4:依赖函数中对 token 进行验证、解码和用户提取
  • 特征5:身份认证与受保护资源访问严格分离

智能家居用户控制中心(正例)

password与bearer验证

现象:
智能家居 App 中用户通过 FastAPI 接口登录,成功登录后返回 bearer token。该 token 用于控制灯光、温度等设备。所有控制接口验证 token 合法性,确保只有认证用户可操作设备。

特征对比

  • ✅ token 通过标准登录接口获取
  • ✅ 控制接口使用 token 依赖项校验身份
  • ✅ 权限与认证流程清晰分离
  • ✅ 遵循 password + bearer 的最简 OAuth2 流程
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

users = {"homeuser": {"password": "smarthome", "token": "hometoken"}}

def get_user_from_token(token: str = Depends(oauth2_scheme)):
    for user, data in users.items():
        if data["token"] == token:
            return user
    raise HTTPException(status_code=403, detail="Not authenticated")

@app.post("/token")
def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = users.get(form_data.username)
    if user and user["password"] == form_data.password:
        return {"access_token": user["token"], "token_type": "bearer"}
    raise HTTPException(status_code=400, detail="Invalid credentials")

@app.post("/control/light")
def control_light(user=Depends(get_user_from_token)):
    return {"message": f"Light toggled by {user}"}

直接携带用户名密码访问所有接口(反例)

password与bearer验证

例子描述
公司一内部运维脚本直接在每次请求中以 URL 参数或请求体明文发送用户名和密码,例如 /deploy?user=admin&pwd=123456。接口内手动解析并比对用户名密码来决定是否允许操作部署任务。

特征对比

  • ❌ 未使用标准 OAuth2 依赖项如 OAuth2PasswordBearerOAuth2PasswordRequestForm
  • ❌ 认证与访问逻辑耦合,未分离 token 流程
  • ❌ 明文传输用户名密码,安全性差
  • ❌ 不支持 token reuse 或权限管理
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()

@app.get("/deploy")
def deploy(request: Request):
    user = request.query_params.get("user")
    pwd = request.query_params.get("pwd")
    if user == "admin" and pwd == "pass123":
        return {"status": "deployment started"}
    raise HTTPException(status_code=403, detail="Unauthorized")

官网例子(正例)

password与bearer验证

实际上,OAuth2 规范_要求_ grant_type 字段使用固定值 password,但 OAuth2PasswordRequestForm 没有作强制约束。
如需强制使用固定值 password,则不要用 OAuth2PasswordRequestForm,而是用 OAuth2PasswordRequestFormStrict

OAuth2PasswordRequestForm 与 OAuth2PasswordBearer 一样,都不是 FastAPI 的特殊类。
FastAPI 把 OAuth2PasswordBearer 识别为安全方案。因此,可以通过这种方式把它添加至 OpenAPI。
但 OAuth2PasswordRequestForm 只是可以自行编写的类依赖项,也可以直接声明 Form 参数。
但由于这种用例很常见,FastAPI 为了简便,就直接提供了对它的支持。

缺点:返回的Token是username,这种方式极度不安全。

from typing import Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm # 导入OAuth2PasswordRequestForm
from pydantic import BaseModel

# 建立伪数据库
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

app = FastAPI()


def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)): # 创建「激活依赖项」,使之仅在当前用户为激活状态时,才能获取`current_user`,如果用户不存在,或状态为未激活,这两个依赖项都会返回HTTP错误
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username) # 从伪数据库中获取用户信息
    if not user_dict: # 不存在用户信息发出错误
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict) # 把用户信息放入用户模型当中
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password: # 使用hash值进行对比
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"} # 响应类容应该包含token_type,本例使用的是BearerToken,Token类型为bearer。而access_token,是包含权限token的字符串


@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

实际效果:

  1. 身份验证
    image.png

  1. 获取当前用户数据
    使用 /users/me 路径的 GET 操作。可以提取如下当前用户数据:
{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false,
  "hashed_password": "fakehashedsecret"
}

  1. 点击小锁图标,注销后,再执行同样的操作,则会得到 HTTP 401 错误:
{
  "detail": "Not authenticated"
}
  1. 未激活用户
    输入用户名:alice 密码:secret2
    将会收到:
{
  "detail": "Inactive user"
}

3.OAuth2 实现密码哈希与 Bearer JWT 令牌验证

JWT

JWT(JSON Web Token)是一种开放标准(RFC 7519),用于在客户端与服务器之间以 安全、可验证的方式传递声明(claims)信息。在 FastAPI 中,JWT 常用于身份认证机制:用户登录后,服务器生成签名的 token 并返回给客户端,客户端随后在每次请求中携带该 token,用于访问受保护资源。

重要特征

  • 特征1:JWT 是结构化的字符串,包含 Header、Payload、Signature 三部分
  • 特征2:Payload 携带用户标识、权限等声明信息
  • 特征3:Signature 用于防篡改,验证 JWT 的真实性
  • 特征4:支持设置过期时间(exp claim)来自动失效
  • 特征5:可在 FastAPI 中结合 OAuth2PasswordBearer 使用
  • 特征6:在依赖函数中使用 jwt.decode() 校验合法性和提取信息

JWT 字符串没有加密,任何人都能用它恢复原始信息。但 JWT 使用了签名机制。接受令牌时,可以用签名校验令牌。使用 JWT 创建有效期为一周的令牌。第二天,用户持令牌再次访问时,仍为登录状态。
令牌于一周后过期,届时,用户身份验证就会失败。只有再次登录,才能获得新的令牌。如果用户(或第三方)篡改令牌的过期时间,因为签名不匹配会导致身份验证失败。

电商用户会话管理(正例)

JWT

例子描述
用户在电商网站登录后,FastAPI 服务返回一个包含 user_idexp 字段的 JWT。客户端每次请求都将该 token 附加在 header 中,服务器依赖项通过解码验证签名与过期时间,并提取用户身份以决定权限。

特征对比

  • ✅ JWT 含有结构化 payload 与签名
  • ✅ 签发后由客户端存储并携带
  • ✅ 在依赖中安全验证 token,有效防篡改
  • ✅ 使用 exp 设置生命周期
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta

SECRET_KEY = "mysecret"
ALGORITHM = "HS256"

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

fake_users = {"alice": {"username": "alice", "password": "1234", "id": 1}}

def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/token")
def login(form: OAuth2PasswordRequestForm = Depends()):
    user = fake_users.get(form.username)
    if not user or user["password"] != form.password:
        raise HTTPException(status_code=400, detail="Incorrect credentials")
    token = create_access_token(data={"sub": user["username"]}, expires_delta=timedelta(minutes=30))
    return {"access_token": token, "token_type": "bearer"}

@app.get("/orders")
def get_orders(user=Depends(verify_token)):
    return {"user": user["sub"], "orders": ["order1", "order2"]}

手动构造伪造token(反例)

JWT

例子描述
为了模拟登录,一些客户端手动构造 JSON 并 base64 编码得到伪造 token,然后在请求中携带该伪造 token。后端仅解析 payload,不验证签名,导致攻击者能伪造任意身份。

特征对比

  • ❌ 没有验证 Signature,容易被伪造
  • ❌ Token 可被篡改而不被检测
  • ❌ 不使用 secret 或 key 解密
  • ❌ 不符合 JWT 的安全设计原则
from fastapi import FastAPI, Request, HTTPException
import base64
import json

app = FastAPI()

@app.get("/admin")
def access_admin(request: Request):
    token = request.headers.get("Authorization")
    if not token:
        raise HTTPException(status_code=403, detail="No token provided")
    try:
        # 模拟 base64 解码 payload,不验证签名
        payload_b64 = token.split(".")[1]
        payload = json.loads(base64.b64decode(payload_b64 + '=='))
        if payload.get("role") == "admin":
            return {"msg": "Welcome admin!"}
    except Exception:
        pass
    raise HTTPException(status_code=403, detail="Access denied")

官网例子(正例)

JWT

  • 安装 PyJWT,在 Python 中生成和校验 JWT 令牌:pip install pyjwt
  • 安装 passlibpip install passlib[bcrypt]Passlib 是处理密码哈希的 Python 包。它支持很多安全哈希算法及配套工具。教程推荐的算法是 Bcrypt

passlib 甚至可以读取 Django、Flask 的安全插件等工具创建的密码。
例如,把 Django 应用的数据共享给 FastAPI 应用的数据库。或利用同一个数据库,可以逐步把应用从 Django 迁移到 FastAPI。并且,用户可以同时从 Django 应用或 FastAPI 应用登录。
PassLib 上下文还支持使用不同哈希算法的功能,包括只能校验的已弃用旧算法等。例如,用它读取和校验其它系统(如 Django)生成的密码,但要使用其它算法,如 Bcrypt,生成新的哈希密码。同时,这些功能都是兼容的。

from datetime import datetime, timedelta, timezone
from typing import Annotated

import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext # 导入加密
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256" # 使用HS256算法
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel): # 设置令牌响应模型
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password): # 工具函数一:校验接收的密码是否匹配存储的哈希值
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password): # 工具函数二:哈希用户的密码
    return pwd_context.hash(password)


def get_user(db, username: str): 
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str): # 工具函数三:验证用户身份并返回用户
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None): # 工具函数四:生成新的访问令牌
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): # 解码并校验接收到的令牌,然后返回当前用户
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return [{"item_id": "Foo", "owner": current_user.username}]

JWT规范sub键

在 JWT(JSON Web Token)规范中,sub(Subject)键是一个 预定义的标准保留声明字段(Registered Claim Name),用于表示该 token 所面向的主体,即 token 所代表的用户或实体的唯一标识。在 FastAPI 的身份认证流程中,sub 常被用于保存用户的 user_idusername,以供后续在依赖项中提取和鉴权。

重要特征:

  • 特征1sub 是 JWT 规范的标准声明字段之一,用于标识 token 所代表的「唯一主体」
  • 特征2sub 字段值应为唯一标识符(如用户名、用户ID、邮箱等)
  • 特征3sub 的用途是便于服务端在解析 token 后快速识别用户身份
  • 特征4:在 FastAPI 中常结合 jwt.decode() 解码并提取 sub,用于后续鉴权
  • 特征5:错误使用 sub(如缺失、字段错误或使用非唯一值)将破坏认证逻辑或安全性

用户名作为用户标识(正例)

JWT规范sub键

例子描述
在一个社交平台中,用户登录后系统签发 JWT,其中 sub 被设置为用户的用户名(如 "sub": "alice_2025")。服务器在后续请求中通过 sub 提取该用户名,获取其资料或权限,确保所有操作正确追踪到该用户。

特征对比

  • ✅ 使用标准字段 sub
  • sub 值为唯一用户标识符
  • ✅ 后续认证流程中以 sub 提取并鉴权
  • ✅ 与 FastAPI 解码逻辑高度兼容
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from datetime import datetime, timedelta

SECRET_KEY = "social_secret"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def create_access_token(username: str):
    expire = datetime.utcnow() + timedelta(minutes=30)
    to_encode = {"sub": username, "exp": expire}
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def get_current_user(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload["sub"]
    except JWTError:
        raise HTTPException(status_code=403, detail="Invalid token")

@app.post("/token")
def login(form: OAuth2PasswordRequestForm = Depends()):
    return {"access_token": create_access_token(form.username), "token_type": "bearer"}

@app.get("/me")
def read_profile(current_user: str = Depends(get_current_user)):
    return {"username": current_user}

用户ID作为唯一标识的(正例)

JWT规范sub键

例子描述
银行系统为每个登录用户分配带 sub 字段的 JWT,值为后端数据库中的用户主键 ID(如 "sub": "user_1843")。所有后续请求都依赖 sub 来查询账户、交易记录等信息,确保严格对应身份。

特征对比

  • ✅ 使用 sub 存储主键 ID,确保唯一
  • sub 用于权限判断与记录查询
  • ✅ 满足 JWT 标准规范
  • ✅ 兼容安全审计机制
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt, JWTError
from datetime import datetime, timedelta

SECRET_KEY = "bank_secret"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()
fake_users = {"user123": {"id": "user_1843", "password": "pass123"}}

def create_access_token(user_id: str):
    expire = datetime.utcnow() + timedelta(minutes=15)
    return jwt.encode({"sub": user_id, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)

def get_user_id(token: str = Depends(oauth2_scheme)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload["sub"]
    except JWTError:
        raise HTTPException(status_code=403, detail="Invalid token")

@app.post("/token")
def login(form: OAuth2PasswordRequestForm = Depends()):
    user = fake_users.get(form.username)
    if not user or user["password"] != form.password:
        raise HTTPException(status_code=400, detail="Bad credentials")
    return {"access_token": create_access_token(user["id"]), "token_type": "bearer"}

@app.get("/account")
def view_account(user_id: str = Depends(get_user_id)):
    return {"user_id": user_id, "account": "Checking-123"}

#e 非标准字段替代 sub(反例) JWT规范sub

例子描述
投票网站签发的 JWT 中使用了自定义字段 "user_name" 来标识用户,而完全省略了标准的 sub 字段。后端系统使用 user_name 判断身份,但通用库与中间件无法识别,导致部分服务(如授权网关)无法读取有效用户信息。

特征对比

  • ❌ 忽略 JWT 标准字段 sub
  • ❌ 使用非标准字段造成兼容性问题
  • ❌ 通用组件/中间件无法解析正确用户标识
  • ❌ 增加维护与审计成本
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import OAuth2PasswordBearer
from jose import jwt

SECRET_KEY = "invalid_sub_key"
ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

# token payload中未使用 "sub",而是自定义字段 "user_name"
def create_token(username: str):
    return jwt.encode({"user_name": username}, SECRET_KEY, algorithm=ALGORITHM)

def get_user(request: Request, token: str = Depends(oauth2_scheme)):
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    # ❌ 错误:这里依赖 user_name 而非标准 sub 字段
    return payload.get("user_name")

@app.post("/token")
def login():
    return {"access_token": create_token("bob"), "token_type": "bearer"}

@app.get("/vote")
def vote(user=Depends(get_user)):
    return {"voter": user, "status": "voted"}

文章作者: Hkini
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Hkini !
评论
  目录