如何在FastAPI中实现用户身份验证和授权

1. 概述

在Web应用程序中,身份验证和授权是十分重要的安全措施。通过对用户进行身份验证,可以保证只有合法用户才能访问应用程序。授权可以根据用户角色控制用户访问的资源和操作。

FastAPI是一个快速、现代、基于Python的Web框架,提供了方便的库和工具来实现身份验证和授权功能。本文将介绍在FastAPI中实现用户身份验证和授权的方法。

2. 身份验证

2.1. JWT

JSON Web Token(JWT)是一个开放标准(RFC 7519),用于在网络环境中传递声明。使用JWT进行身份验证的方式是在用户登录时,将用户的认证信息生成一个JWT令牌(token),并在以后的每个请求中附加这个令牌,并在服务器中验证这个令牌是否是合法的。

要在FastAPI中使用JWT进行身份验证,需要借助第三方包PyJWT。以下是一个使用JWT进行身份验证的示例:

import jwt

from fastapi import Depends, FastAPI, HTTPException

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()

# JWT配置

JWT_SECRET_KEY = 'myjwtsecretkey'

ALGORITHM = 'HS256'

ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')

def verify_token(token: str, credentials_exception):

try:

payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[ALGORITHM])

username: str = payload.get('sub')

if username is None:

raise credentials_exception

except jwt.JWTError:

raise credentials_exception

# 登录验证函数

async def login(username: str, password: str):

# 验证用户名和密码

user = get_user(username)

if not user:

raise HTTPException(status_code=400, detail='Invalid username or password')

if not verify_password(password, user['password']):

raise HTTPException(status_code=400, detail='Invalid username or password')

# 生成JWT令牌

access_token = create_access_token(data={'sub': user['username']})

return {'access_token': access_token, 'token_type': 'bearer'}

# 令牌创建函数

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):

to_encode = data.copy()

if expires_delta:

expire = datetime.utcnow() + expires_delta

else:

expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

to_encode.update({'exp': expire})

encoded_jwt = jwt.encode(to_encode, JWT_SECRET_KEY, algorithm=ALGORITHM)

return encoded_jwt

# 处理登录请求

@app.post('/token')

async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):

token_data = await login(form_data.username, form_data.password)

return token_data

# 需要验证身份的API

@app.get('/users/me')

async def read_users_me(token: str = Depends(oauth2_scheme)):

credentials_exception = HTTPException(

status_code=401,

detail='Could not validate credentials',

headers={'WWW-Authenticate': 'Bearer'},

)

verify_token(token, credentials_exception)

return {'username': username}

2.2. BasicAuth

Basic Authentication是一种基于HTTP协议的简单身份验证方式,其原理是在请求头部添加Authorization字段,该字段的值为"Basic base64(username:password)"。在FastAPI中可以使用FastAPI自带的HTTPBasic认证,该认证将返回验证用户的用户名和密码。

以下是一个使用BasicAuth进行身份验证的示例:

from fastapi import Depends, FastAPI, HTTPException

from fastapi.security import HTTPBasic, HTTPBasicCredentials

app = FastAPI()

security = HTTPBasic()

# 登录验证函数

async def login(credentials: HTTPBasicCredentials = Depends(security)):

# 验证用户名和密码

user = get_user(credentials.username)

if not user:

raise HTTPException(status_code=400, detail='Invalid username or password')

if not verify_password(credentials.password, user['password']):

raise HTTPException(status_code=400, detail='Invalid username or password')

return {'username': credentials.username, 'password': credentials.password}

# 需要验证身份的API

@app.get('/users/me')

async def read_users_me(credentials: HTTPBasicCredentials = Depends(security)):

user = await login(credentials)

return user

3. 授权

3.1. Role-based Access Control (RBAC)

基于角色的访问控制(RBAC)是一种流行的授权技术,它根据用户的角色(role)授权用户访问资源。在FastAPI中,可以使用Depends依赖注入和FastAPI的内置权限系统来实现RBAC。

以下是一个使用RBAC进行授权的示例:

from fastapi import Depends, FastAPI, HTTPException

from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')

# 用户角色枚举

class UserRole(str, Enum):

admin = 'admin'

normal_user = 'normal_user'

# 模拟用户和角色关系

users_roles = {

'admin': UserRole.admin,

'user1': UserRole.normal_user,

'user2': UserRole.normal_user,

}

# 验证角色函数

def verify_user_role(token: str = Depends(oauth2_scheme), roles: List[UserRole] = []):

credentials_exception = HTTPException(status_code=401, detail='Could not validate credentials')

verify_token(token, credentials_exception)

username = get_username_from_token(token)

if username not in users_roles:

raise HTTPException(status_code=401, detail='Could not validate credentials')

user_role = users_roles[username]

if roles and user_role not in roles:

raise HTTPException(status_code=403, detail='Forbidden')

return user_role

# 需要admin角色的API

@app.get('/admin_only')

async def admin_only(user_role: UserRole = Depends(verify_user_role([UserRole.admin]))):

return {'message': 'Hello admin'}

# 需要normal_user角色的API

@app.get('/normal_user')

async def normal_user(user_role: UserRole = Depends(verify_user_role([UserRole.normal_user]))):

return {'message': 'Hello normal user'}

3.2. Permission-based Acess Control (PBAC)

基于权限的访问控制(PBAC)是另一种流行的授权技术,它将授权的重点放在了权限上。在FastAPI中,可以使用Depends依赖注入和FastAPI的内置权限系统来实现PBAC。

以下是一个使用RBAC进行授权的示例:

from fastapi import Depends, FastAPI, HTTPException, Path

from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/token')

# 权限枚举

class Permission(str, Enum):

read = 'read'

write = 'write'

# 模拟权限配置

permissions = {

'admin': [Permission.read, Permission.write],

'user1': [Permission.read],

'user2': [Permission.write],

}

# 验证权限函数

def verify_permission(token: str = Depends(oauth2_scheme),

permission: Permission = None,

username: str = Path(..., description='User name')):

credentials_exception = HTTPException(status_code=401, detail='Could not validate credentials')

verify_token(token, credentials_exception)

if username not in permissions:

raise HTTPException(status_code=401, detail='Could not validate credentials')

user_permissions = permissions[username]

if permission and permission not in user_permissions:

raise HTTPException(status_code=403, detail='Forbidden')

return True

# 需要read权限的API

@app.get('/users/{username}', dependencies=[Depends(verify_permission(permission=Permission.read))])

async def read_user(username: str = Path(..., description='User name')):

return {'username': username}

# 需要write权限的API

@app.put('/users/{username}', dependencies=[Depends(verify_permission(permission=Permission.write))])

async def update_user(username: str = Path(..., description='User name')):

return {'username': username}

4. 总结

本文介绍了在FastAPI中实现用户身份验证和授权的方法,包括使用JWT和Basic Authentication进行身份验证,以及基于角色的访问控制(RBAC)和基于权限的访问控制(PBAC)进行授权。

身份验证和授权是Web应用程序中非常重要的安全措施,使用FastAPI提供的库和工具可以轻松地实现这些功能,使应用程序更加安全可靠。建议开发人员在开发过程中始终牢记安全性,并遵循最佳实践来确保数据和用户的安全。

后端开发标签