前言
本文章介绍如何使用基于 AppClient 模型的 Django-Ninja API 鉴权机制。
这也是上次说的中台项目衍生物
中台项目相关的文章,我大概还会再写一篇
这个系列的文章注定是没什么人看的,毕竟还是小众了一些
不过我还是得写,没有读者也要记录,以后需要的时候就能用上
PS: 本文基于使用 DjangoStarter 框架,默认读者具备 DjangoStarter 3.0 以上版本的项目结构、基础设施
限于篇幅关系,本文无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码
概述
这套鉴权机制提供了以下功能:
- 多种认证方式:支持查询参数、请求头和 Bearer Token 三种方式
- IP 白名单:支持单个 IP 和 CIDR 网段限制
- 权限范围控制:基于 scopes 的细粒度权限管理
- 状态检查:自动验证 AppClient 的启用状态
- 安全性:包含完整的认证和授权流程
搭配官方文档食用更佳哦~
https://django-ninja.dev/guides/authentication/
使用方法
为了能有个直观的体验
我先介绍使用方法
后面再来讲实现
1. 创建 AppClient
首先在 Django Admin 或通过代码创建一个 AppClient:
from apps.app.models import AppClient app_client = AppClient.objects.create( app_id="my-app", app_name="我的应用", secret_key="your-secret-key-here", allowed_ips="192.168.1.0/24,10.0.0.1", # 可选:IP 限制 scopes="project:read,project:write,user:read", # 可选:权限范围 status=AppClient.Status.ACTIVE )
2. 在 API 中使用认证
from ninja import Router from core.authentication import ( api_key_auth, api_key_header_auth, api_key_bearer_auth, require_app_client_scopes, AppClientScopes, ) router = Router() # 使用查询参数认证 @router.get("/data", auth=api_key_auth) def get_data(request): app_client = request.auth return {"data": "success", "client": app_client.app_name} # 使用请求头认证 @router.get("/info", auth=api_key_header_auth) def get_info(request): return {"info": "success"} # 使用 Bearer Token 认证 @router.get("/status", auth=api_key_bearer_auth) def get_status(request): return {"status": "success"}
3. 添加权限控制
# 需要特定权限 @router.get("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def list_projects(request): return {"projects": []} # 需要多个权限 @router.post("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_WRITE, AppClientScopes.USER_READ]) def create_project(request): return {"message": "项目创建成功"} # 手动检查权限 @router.get("/custom-check", auth=app_client_api_key) def custom_permission_check(request): from core.authentication import AppClientScopeChecker if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]): return {"access": "granted"} else: return {"access": "denied"}
认证方式
1. 查询参数认证 (APIKeyQuery)
使用方式:
GET /api/endpoint?api_key=your-secret-key
代码示例:
from core.authentication import api_key_auth @router.get("/data", auth=api_key_auth) def get_data(request): return {"data": "success"}
2. 请求头认证 (APIKeyHeader)
使用方式:
GET /api/endpoint X-API-Key: your-secret-key
代码示例:
from core.authentication import api_key_header_auth @router.get("/data", auth=api_key_header_auth) def get_data(request): return {"data": "success"}
3. Bearer Token 认证 (HttpBearer)
使用方式:
GET /api/endpoint Authorization: Bearer your-secret-key
代码示例:
from core.authentication import api_key_bearer_auth @router.get("/data", auth=api_key_bearer_auth) def get_data(request): return {"data": "success"}
权限范围 (Scopes) 系统
事先说明,scopes 方式只是实现起来最简单,但不代表最合适
毕竟在 Django 框架中,本来就有一套权限体系了,我也有想过接入那套权限体系,可以很方便地在后台编辑每个应用的权限
不过因为实现起来会稍微麻烦一点点,为了快速上线,我就先自己搞了个 scopes 系统了~
后续肯定是要试试 接入 Django 权限体系 的~!
权限格式
权限范围使用 resource:action 格式,例如:
project:read- 项目读取权限project:write- 项目写入权限user:*- 用户相关所有权限*- 超级管理员权限
预定义权限常量
from core.authentication import AppClientScopes # 项目权限 AppClientScopes.PROJECT_READ # "project:read" AppClientScopes.PROJECT_WRITE # "project:write" AppClientScopes.PROJECT_DELETE # "project:delete" AppClientScopes.PROJECT_ALL # "project:*" # 用户权限 AppClientScopes.USER_READ # "user:read" AppClientScopes.USER_WRITE # "user:write" AppClientScopes.USER_DELETE # "user:delete" AppClientScopes.USER_ALL # "user:*" # 超级权限 AppClientScopes.SUPER_ADMIN # "*"
权限检查方法
1. 装饰器方式(推荐)
from core.authentication import require_app_client_scopes, AppClientScopes @router.get("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def list_projects(request): return {"projects": []}
2. 手动检查方式
from core.authentication import AppClientScopeChecker @router.get("/data", auth=app_client_api_key) def get_data(request): # 检查是否具有所有指定权限 if AppClientScopeChecker.check_scopes(request, [AppClientScopes.PROJECT_READ]): return {"data": "success"} # 检查是否具有任意一个权限 if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]): return {"data": "partial access"} # 获取客户端所有权限 scopes = AppClientScopeChecker.get_client_scopes(request) return {"error": "权限不足", "your_scopes": scopes}
IP 白名单
配置 IP 限制
在 AppClient 的 allowed_ips 字段中配置允许的 IP 地址:
# 单个 IP app_client.allowed_ips = "192.168.1.100" # 多个 IP app_client.allowed_ips = "192.168.1.100,10.0.0.1,203.0.113.5" # CIDR 网段 app_client.allowed_ips = "192.168.1.0/24,10.0.0.0/8" # 混合配置 app_client.allowed_ips = "192.168.1.100,10.0.0.0/8,203.0.113.0/24" # 不限制 IP(留空或 None) app_client.allowed_ips = None
IP 获取逻辑
系统会按以下优先级获取客户端 IP:
HTTP_X_FORWARDED_FOR头部的第一个 IP(适用于代理/负载均衡环境)REMOTE_ADDR(直接连接的 IP)
错误处理
认证失败
当认证失败时,会返回 HTTP 401 错误:
{ "detail": "Unauthorized" }
权限不足
当权限检查失败时,会返回 HTTP 403 错误:
{ "detail": "缺少权限: project:write" }
自定义错误处理
from core.authentication import AppClientPermissionError @router.get("/custom", auth=app_client_api_key) def custom_endpoint(request): if not some_condition: raise AppClientPermissionError("自定义权限错误信息") return {"data": "success"}
测试 API
项目提供了完整的测试 API,可以用来验证认证和权限功能:
# 测试 API Key 认证 GET /api/app/test/api-key?api_key=your-secret-key # 测试请求头认证 GET /api/app/test/api-key-header X-API-Key: your-secret-key # 测试 Bearer Token 认证 GET /api/app/test/bearer Authorization: Bearer your-secret-key # 测试权限范围 GET /api/app/test/scopes/project-read?api_key=your-secret-key GET /api/app/test/scopes/project-write?api_key=your-secret-key GET /api/app/test/scopes/multiple?api_key=your-secret-key
最佳实践
以下是一些最佳实践的建议
无论是使用什么技术、框架,实际开发中都可以参考一下
1. 密钥管理
- 使用强密钥(建议 32 字符以上)
- 定期轮换密钥
- 不要在代码中硬编码密钥
- 使用环境变量或安全的配置管理系统
2. 权限设计
- 遵循最小权限原则
- 使用细粒度的权限范围
- 定期审查和清理不必要的权限
- 为不同的应用场景创建不同的 AppClient
3. IP 限制
- 在生产环境中启用 IP 白名单
- 使用 CIDR 网段而不是单个 IP(便于扩展)
- 考虑 CDN 和代理的 IP 转发
4. 监控和日志
import logging logger = logging.getLogger(__name__) @router.get("/sensitive-data", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def get_sensitive_data(request): app_client = request.auth logger.info(f"AppClient {app_client.app_id} accessed sensitive data") return {"data": "sensitive information"}
集成到现有 API
添加路由
在 config/apis.py 中添加 app 路由:
from apps.app.apis import router as app_router api.add_router('app', app_router)
混合认证
这是 django-ninja 提供的新功能
可以在同一个 API 中同时支持多种认证方式:
from ninja.security import django_auth from core.authentication import api_key_auth # 同时支持 Django 用户认证和 AppClient 认证 @router.get("/user-data", auth=[django_auth, api_key_auth]) def get_data(request): return {"user": request.user.username}
更新主 API 配置
除此之外,还可以在主 API 配置全局认证和其他功能
from django.contrib.admin.views.decorators import staff_member_required api = NinjaAPI( title=f'{settings.DJANGO_STARTER["project_info"]["name"]} APIs', description=settings.DJANGO_STARTER["project_info"]["description"], renderer=JSONRespRenderer(), csrf=True, auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth], urls_namespace='api', docs=Swagger(settings={"persistAuthorization": True}), docs_decorator=staff_member_required, )
这里简单介绍一下
我这段配置:
- 使用
auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth]支持多种认证方式(其实还有一个JWT的,不过我这里没考虑) - 使用
docs_decorator=staff_member_required,配置了访问swagger文档必须是已登录的staff_member
实现
OK,最后再介绍一下具体实现 (应该没人会坚持看到这么后面吧…)
限于篇幅关系,本文无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码
模型
首先是 AppClient 模型,代码位于 src/apps/app/models.py 内
除了字段定义,还添加了一些方法用于 获取权限范围列表、获取允许的 IP 地址列表 之类的功能
按照DDD思想来讲,这个模型算是一个 充血模型 😃
import uuid from django.db import models from django.core.validators import RegexValidator from django_starter.db.models import ModelExt class AppClient(ModelExt): """应用客户端模型,用于管理第三方应用的访问权限""" class Status(models.TextChoices): ACTIVE = 'active', '启用' INACTIVE = 'inactive', '禁用' id = models.BigAutoField( primary_key=True, verbose_name='主键ID', help_text='系统自动生成的唯一标识符', db_comment='物理主键,自增ID' ) guid = models.UUIDField( default=uuid.uuid4, unique=True, editable=False, verbose_name='全局唯一标识', help_text='系统自动生成的UUID,用作逻辑主键', db_comment='逻辑主键,GUID' ) app_id = models.CharField( max_length=100, unique=True, verbose_name='应用标识', help_text='用于标识调用方系统的唯一ID,建议使用有意义的命名', db_comment='应用标识,用于标识调用方系统', validators=[ RegexValidator( regex=r'^[a-zA-Z0-9_-]+$', message='应用标识只能包含字母、数字、下划线和连字符' ) ] ) app_name = models.CharField( max_length=200, verbose_name='应用名称', help_text='调用方应用的显示名称或备注信息', db_comment='应用名称,调用方的名称或备注信息' ) secret_key = models.CharField( max_length=100, verbose_name='密钥', help_text='用于API签名验证的密钥,请妥善保管', db_comment='密钥,用于签名验证', ) allowed_ips = models.TextField( blank=True, null=True, verbose_name='允许访问的IP', help_text='允许访问的IP地址列表,多个IP用逗号分隔,留空表示不限制', db_comment='允许访问的 IP 列表,逗号分隔' ) scopes = models.TextField( blank=True, null=True, verbose_name='权限范围', help_text='应用的权限范围,格式如:project:read,project:write', db_comment='权限范围,如 project:read,project:write' ) status = models.CharField( max_length=10, choices=Status.choices, default=Status.ACTIVE, verbose_name='状态', help_text='应用的启用状态', db_comment='启用/禁用状态' ) class Meta: db_table = 'app_client' verbose_name = '应用客户端' verbose_name_plural = '应用客户端' ordering = ['-create_time'] indexes = [ models.Index(fields=['app_id']), models.Index(fields=['status']), models.Index(fields=['create_time']), ] def __str__(self): return f'{self.app_name} ({self.app_id})' def is_active(self): """检查应用是否处于活跃状态""" # todo 为什么这个 is_deleted 会变成 str 啊?? # return self.status == self.Status.ACTIVE and not self.is_deleted return self.status == self.Status.ACTIVE def get_scopes_list(self): """获取权限范围列表""" if not self.scopes: return [] return [scope.strip() for scope in self.scopes.split(',') if scope.strip()] def get_allowed_ips_list(self): """获取允许的IP地址列表""" if not self.allowed_ips: return [] return [ip.strip() for ip in self.allowed_ips.split(',') if ip.strip()] def save(self, *args, **kwargs): """重写save方法,添加自定义逻辑""" # 确保app_name不为空 if not self.app_name: self.app_name = self.app_id super().save(*args, **kwargs)
测试接口
添加一些测试接口,方便后续调试
代码位于 src/apps/app/apis/client/apis.py
from typing import List from ninja import Router from django.http import HttpRequest from core.authentication import ( api_key_auth, api_key_header_auth, api_key_bearer_auth, require_app_client_scopes, AppClientScopes, AppClientScopeChecker, ) from apps.app.models import AppClient from .schemas import AppClientSchema, AppClientCreateSchema router = Router(tags=['AppClient API']) @router.get("/test/api-key", auth=api_key_auth) def test_api_key_auth(request: HttpRequest): """测试API Key认证(查询参数方式) 使用方式: GET /api/app/test/api-key?api_key=your_secret_key """ app_client = request.auth return { "message": "API Key认证成功", "app_id": app_client.app_id, "app_name": app_client.app_name, "scopes": app_client.get_scopes_list(), } @router.get("/test/api-key-header", auth=api_key_header_auth) def test_api_key_header_auth(request: HttpRequest): """测试API Key认证(请求头方式) 使用方式: GET /api/app/test/api-key-header Headers: X-API-Key: your_secret_key """ app_client = request.auth return { "message": "API Key Header认证成功", "app_id": app_client.app_id, "app_name": app_client.app_name, "scopes": app_client.get_scopes_list(), } @router.get("/test/bearer", auth=api_key_bearer_auth) def test_bearer_auth(request: HttpRequest): """测试Bearer Token认证 使用方式: GET /api/app/test/bearer Headers: Authorization: Bearer your_secret_key """ app_client = request.auth return { "message": "Bearer Token认证成功", "app_id": app_client.app_id, "app_name": app_client.app_name, "scopes": app_client.get_scopes_list(), } @router.get("/test/scopes/project-read", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def test_project_read_scope(request: HttpRequest): """测试项目读取权限 需要权限: project:read """ return { "message": "项目读取权限验证成功", "required_scope": AppClientScopes.PROJECT_READ, "client_scopes": AppClientScopeChecker.get_client_scopes(request), } @router.get("/test/scopes/project-write", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_WRITE]) def test_project_write_scope(request: HttpRequest): """测试项目写入权限 需要权限: project:write """ return { "message": "项目写入权限验证成功", "required_scope": AppClientScopes.PROJECT_WRITE, "client_scopes": AppClientScopeChecker.get_client_scopes(request), } @router.get("/test/scopes/multiple", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ]) def test_multiple_scopes(request: HttpRequest): """测试多个权限要求 需要权限: project:read AND user:read """ return { "message": "多权限验证成功", "required_scopes": [AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ], "client_scopes": AppClientScopeChecker.get_client_scopes(request), } @router.get("/test/scopes/any", auth=api_key_auth) def test_any_scope(request: HttpRequest): """测试任意权限检查 检查是否具有项目或用户相关的任意权限 """ has_project_access = AppClientScopeChecker.has_any_scope( request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE] ) has_user_access = AppClientScopeChecker.has_any_scope( request, [AppClientScopes.USER_READ, AppClientScopes.USER_WRITE] ) return { "message": "权限检查完成", "has_project_access": has_project_access, "has_user_access": has_user_access, "client_scopes": AppClientScopeChecker.get_client_scopes(request), } @router.get("/info", auth=api_key_auth) def get_client_info(request: HttpRequest): """获取当前认证的AppClient信息""" app_client = request.auth return { "app_id": app_client.app_id, "app_name": app_client.app_name, "status": app_client.status, "scopes": app_client.get_scopes_list(), "allowed_ips": app_client.get_allowed_ips_list(), "create_time": app_client.create_time, } @router.get("/clients", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.SUPER_ADMIN]) def list_app_clients(request: HttpRequest) -> List[dict]: """列出所有AppClient(需要超级管理员权限) 需要权限: * """ clients = AppClient.objects.filter(is_deleted=False) return [ { "app_id": client.app_id, "app_name": client.app_name, "status": client.status, "scopes": client.get_scopes_list(), "create_time": client.create_time, } for client in clients ]
管理命令
为了方便使用,我又添加了俩命令
顾名思义,创建新的 AppClient 和 列出所有
src/apps/app/management/commands/create_app_client.pysrc/apps/app/management/commands/list_app_clients.py
限于篇幅原因,无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码
核心逻辑
关于鉴权的核心逻辑,我单独创建一个 core package 来存放
目录结构如下
core ├─ exceptions │ ├─ __init__.py │ └─ permissions.py ├─ authentication │ ├─ __init__.py │ ├─ README.md │ ├─ permissions.py │ ├─ app.py │ └─ api_key.py └─ __init__.py
最关键的代码就是 src/core/authentication/app.py 中的这几个认证实例
代码中有详细注释,一看就能理解。
如果觉得这个还比较复杂,后续我再介绍一个极简的实现方式。
class AppClientAuthMixin: """AppClient认证混入类,提供通用的认证逻辑""" def _get_client_ip(self, request: HttpRequest) -> str: """获取客户端真实IP地址""" x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR') if x_forwarded_for: ip = x_forwarded_for.split(',')[0].strip() else: ip = request.META.get('REMOTE_ADDR') return ip def _validate_ip_access(self, app_client: AppClient, request: HttpRequest) -> bool: """验证IP访问权限""" allowed_ips = app_client.get_allowed_ips_list() if not allowed_ips: # 如果没有设置IP限制,则允许所有IP return True client_ip = self._get_client_ip(request) for allowed_ip in allowed_ips: try: # 支持单个IP和CIDR网段 if '/' in allowed_ip: network = ipaddress.ip_network(allowed_ip, strict=False) if ipaddress.ip_address(client_ip) in network: return True else: if client_ip == allowed_ip: return True except (ipaddress.AddressValueError, ValueError): # 忽略无效的IP格式 continue return False def _authenticate_app_client(self, request: HttpRequest, key: str) -> Optional[AppClient]: """认证AppClient的通用逻辑""" try: app_client = AppClient.objects.get(secret_key=key) # 检查应用是否处于活跃状态 if not app_client.is_active(): return None # 验证IP访问权限 if not self._validate_ip_access(app_client, request): return None # 将权限范围添加到request中,供后续权限检查使用 request.app_client_scopes = app_client.get_scopes_list() return app_client except AppClient.DoesNotExist: return None class AppClientApiKey(AppClientAuthMixin, APIKeyQuery): """基于查询参数的AppClient API Key认证""" param_name = "api_key" def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]: return self._authenticate_app_client(request, key) class AppClientApiKeyHeader(AppClientAuthMixin, APIKeyHeader): """基于请求头的AppClient API Key认证""" param_name = "X-API-Key" def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]: return self._authenticate_app_client(request, key) class AppClientBearer(AppClientAuthMixin, HttpBearer): """基于Bearer Token的AppClient认证""" def authenticate(self, request: HttpRequest, token: str) -> Optional[AppClient]: return self._authenticate_app_client(request, token) # 创建认证实例,可以在API中直接使用 api_key_auth = AppClientApiKey() api_key_header_auth = AppClientApiKeyHeader() api_key_bearer_auth = AppClientBearer()
极简实现方式
这套完善的鉴权逻辑,如果你觉得太复杂了
没关系,我这还做了个极简版本,去掉了权限、IP限制等乱七八糟的东西,只校验 api-key 是否有效
实现代码在 src/core/authentication/api_key.py
自定义的 ApiKey 类继承自 ninja.security.APIKeyQuery 类,表明 api_key 参数需要放在 GET query params 里
from ninja.security import APIKeyQuery from apps.app.models import AppClient class ApiKey(APIKeyQuery): param_name = "api_key" def authenticate(self, request, key): try: c = AppClient.objects.get(secret_key=key) print(f'api key authenticated: {c.secret_key}, app id: {c.app_id}') return c except AppClient.DoesNotExist: pass
使用也很简单
from apps.app.models import AppClient from core.authentication.api_key import ApiKey @router.get("/data", auth=ApiKey()) def get_data(request): assert isinstance(request.auth, AppClient) return {"data": "success"}
客户端请求的时候需要在 GET query params 里带上 AppClient 的 secret_key
校验通过的话,ninja 会自动在 request.auth 添加 AppClient 实例,在接口代码里可以直接获取这个实例。
小结
这篇文章居然写了两万多字🕶
这还是在删减了很多代码的情况下…
再啰嗦一次,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码
这么长的文章,大概不会有人有耐心看到最后吧~
溜了,溜了~