在 django-ninja 中实现类似腾讯阿里云的应用鉴权机制

前言

本文章介绍如何使用基于 AppClient 模型的 Django-Ninja API 鉴权机制。

这也是上次说的中台项目衍生物

中台项目相关的文章,我大概还会再写一篇

这个系列的文章注定是没什么人看的,毕竟还是小众了一些

不过我还是得写,没有读者也要记录,以后需要的时候就能用上

PS: 本文基于使用 DjangoStarter 框架,默认读者具备 DjangoStarter 3.0 以上版本的项目结构、基础设施

限于篇幅关系,本文无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码

概述

这套鉴权机制提供了以下功能:

  • 多种认证方式:支持查询参数、请求头和 Bearer Token 三种方式
  • IP 白名单:支持单个 IP 和 CIDR 网段限制
  • 权限范围控制:基于 scopes 的细粒度权限管理
  • 状态检查:自动验证 AppClient 的启用状态
  • 安全性:包含完整的认证和授权流程

搭配官方文档食用更佳哦~

https://django-ninja.dev/guides/authentication/

使用方法

为了能有个直观的体验

我先介绍使用方法

后面再来讲实现

1. 创建 AppClient

首先在 Django Admin 或通过代码创建一个 AppClient:

from apps.app.models import AppClient  app_client = AppClient.objects.create(     app_id="my-app",     app_name="我的应用",     secret_key="your-secret-key-here",     allowed_ips="192.168.1.0/24,10.0.0.1",  # 可选:IP 限制     scopes="project:read,project:write,user:read",  # 可选:权限范围     status=AppClient.Status.ACTIVE ) 

2. 在 API 中使用认证

from ninja import Router from core.authentication import (    api_key_auth,    api_key_header_auth,    api_key_bearer_auth,    require_app_client_scopes,    AppClientScopes, )  router = Router()   # 使用查询参数认证 @router.get("/data", auth=api_key_auth) def get_data(request):    app_client = request.auth    return {"data": "success", "client": app_client.app_name}   # 使用请求头认证 @router.get("/info", auth=api_key_header_auth) def get_info(request):    return {"info": "success"}   # 使用 Bearer Token 认证 @router.get("/status", auth=api_key_bearer_auth) def get_status(request):    return {"status": "success"} 

3. 添加权限控制

# 需要特定权限 @router.get("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def list_projects(request):     return {"projects": []}  # 需要多个权限 @router.post("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_WRITE, AppClientScopes.USER_READ]) def create_project(request):     return {"message": "项目创建成功"}  # 手动检查权限 @router.get("/custom-check", auth=app_client_api_key) def custom_permission_check(request):     from core.authentication import AppClientScopeChecker          if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]):         return {"access": "granted"}     else:         return {"access": "denied"} 

认证方式

1. 查询参数认证 (APIKeyQuery)

使用方式:

GET /api/endpoint?api_key=your-secret-key 

代码示例:

from core.authentication import api_key_auth   @router.get("/data", auth=api_key_auth) def get_data(request):    return {"data": "success"} 

2. 请求头认证 (APIKeyHeader)

使用方式:

GET /api/endpoint X-API-Key: your-secret-key 

代码示例:

from core.authentication import api_key_header_auth   @router.get("/data", auth=api_key_header_auth) def get_data(request):    return {"data": "success"} 

3. Bearer Token 认证 (HttpBearer)

使用方式:

GET /api/endpoint Authorization: Bearer your-secret-key 

代码示例:

from core.authentication import api_key_bearer_auth   @router.get("/data", auth=api_key_bearer_auth) def get_data(request):    return {"data": "success"} 

权限范围 (Scopes) 系统

事先说明,scopes 方式只是实现起来最简单,但不代表最合适

毕竟在 Django 框架中,本来就有一套权限体系了,我也有想过接入那套权限体系,可以很方便地在后台编辑每个应用的权限

不过因为实现起来会稍微麻烦一点点,为了快速上线,我就先自己搞了个 scopes 系统了~

后续肯定是要试试 接入 Django 权限体系 的~!

权限格式

权限范围使用 resource:action 格式,例如:

  • project:read - 项目读取权限
  • project:write - 项目写入权限
  • user:* - 用户相关所有权限
  • * - 超级管理员权限

预定义权限常量

from core.authentication import AppClientScopes  # 项目权限 AppClientScopes.PROJECT_READ     # "project:read" AppClientScopes.PROJECT_WRITE    # "project:write" AppClientScopes.PROJECT_DELETE   # "project:delete" AppClientScopes.PROJECT_ALL      # "project:*"  # 用户权限 AppClientScopes.USER_READ        # "user:read" AppClientScopes.USER_WRITE       # "user:write" AppClientScopes.USER_DELETE      # "user:delete" AppClientScopes.USER_ALL         # "user:*"  # 超级权限 AppClientScopes.SUPER_ADMIN      # "*" 

权限检查方法

1. 装饰器方式(推荐)

from core.authentication import require_app_client_scopes, AppClientScopes  @router.get("/projects", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def list_projects(request):     return {"projects": []} 

2. 手动检查方式

from core.authentication import AppClientScopeChecker  @router.get("/data", auth=app_client_api_key) def get_data(request):     # 检查是否具有所有指定权限     if AppClientScopeChecker.check_scopes(request, [AppClientScopes.PROJECT_READ]):         return {"data": "success"}          # 检查是否具有任意一个权限     if AppClientScopeChecker.has_any_scope(request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]):         return {"data": "partial access"}          # 获取客户端所有权限     scopes = AppClientScopeChecker.get_client_scopes(request)     return {"error": "权限不足", "your_scopes": scopes} 

IP 白名单

配置 IP 限制

在 AppClient 的 allowed_ips 字段中配置允许的 IP 地址:

# 单个 IP app_client.allowed_ips = "192.168.1.100"  # 多个 IP app_client.allowed_ips = "192.168.1.100,10.0.0.1,203.0.113.5"  # CIDR 网段 app_client.allowed_ips = "192.168.1.0/24,10.0.0.0/8"  # 混合配置 app_client.allowed_ips = "192.168.1.100,10.0.0.0/8,203.0.113.0/24"  # 不限制 IP(留空或 None) app_client.allowed_ips = None 

IP 获取逻辑

系统会按以下优先级获取客户端 IP:

  1. HTTP_X_FORWARDED_FOR 头部的第一个 IP(适用于代理/负载均衡环境)
  2. REMOTE_ADDR(直接连接的 IP)

错误处理

认证失败

当认证失败时,会返回 HTTP 401 错误:

{     "detail": "Unauthorized" } 

权限不足

当权限检查失败时,会返回 HTTP 403 错误:

{     "detail": "缺少权限: project:write" } 

自定义错误处理

from core.authentication import AppClientPermissionError  @router.get("/custom", auth=app_client_api_key) def custom_endpoint(request):     if not some_condition:         raise AppClientPermissionError("自定义权限错误信息")     return {"data": "success"} 

测试 API

项目提供了完整的测试 API,可以用来验证认证和权限功能:

# 测试 API Key 认证 GET /api/app/test/api-key?api_key=your-secret-key  # 测试请求头认证 GET /api/app/test/api-key-header X-API-Key: your-secret-key  # 测试 Bearer Token 认证 GET /api/app/test/bearer Authorization: Bearer your-secret-key  # 测试权限范围 GET /api/app/test/scopes/project-read?api_key=your-secret-key GET /api/app/test/scopes/project-write?api_key=your-secret-key GET /api/app/test/scopes/multiple?api_key=your-secret-key 

最佳实践

以下是一些最佳实践的建议

无论是使用什么技术、框架,实际开发中都可以参考一下

1. 密钥管理

  • 使用强密钥(建议 32 字符以上)
  • 定期轮换密钥
  • 不要在代码中硬编码密钥
  • 使用环境变量或安全的配置管理系统

2. 权限设计

  • 遵循最小权限原则
  • 使用细粒度的权限范围
  • 定期审查和清理不必要的权限
  • 为不同的应用场景创建不同的 AppClient

3. IP 限制

  • 在生产环境中启用 IP 白名单
  • 使用 CIDR 网段而不是单个 IP(便于扩展)
  • 考虑 CDN 和代理的 IP 转发

4. 监控和日志

import logging  logger = logging.getLogger(__name__)  @router.get("/sensitive-data", auth=app_client_api_key) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def get_sensitive_data(request):     app_client = request.auth     logger.info(f"AppClient {app_client.app_id} accessed sensitive data")     return {"data": "sensitive information"} 

集成到现有 API

添加路由

config/apis.py 中添加 app 路由:

from apps.app.apis import router as app_router  api.add_router('app', app_router) 

混合认证

这是 django-ninja 提供的新功能

可以在同一个 API 中同时支持多种认证方式:

from ninja.security import django_auth from core.authentication import api_key_auth   # 同时支持 Django 用户认证和 AppClient 认证 @router.get("/user-data", auth=[django_auth, api_key_auth]) def get_data(request):    return {"user": request.user.username} 

更新主 API 配置

除此之外,还可以在主 API 配置全局认证和其他功能

from django.contrib.admin.views.decorators import staff_member_required  api = NinjaAPI(     title=f'{settings.DJANGO_STARTER["project_info"]["name"]} APIs',     description=settings.DJANGO_STARTER["project_info"]["description"],     renderer=JSONRespRenderer(),     csrf=True,     auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth],     urls_namespace='api',     docs=Swagger(settings={"persistAuthorization": True}),     docs_decorator=staff_member_required, ) 

这里简单介绍一下

我这段配置:

  • 使用 auth=[django_auth, api_key_auth, api_key_header_auth, api_key_bearer_auth] 支持多种认证方式(其实还有一个JWT的,不过我这里没考虑)
  • 使用 docs_decorator=staff_member_required, 配置了访问swagger文档必须是已登录的 staff_member

实现

OK,最后再介绍一下具体实现 (应该没人会坚持看到这么后面吧…)

限于篇幅关系,本文无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码

模型

首先是 AppClient 模型,代码位于 src/apps/app/models.py

除了字段定义,还添加了一些方法用于 获取权限范围列表获取允许的 IP 地址列表 之类的功能

按照DDD思想来讲,这个模型算是一个 充血模型 😃

import uuid from django.db import models from django.core.validators import RegexValidator from django_starter.db.models import ModelExt  class AppClient(ModelExt):     """应用客户端模型,用于管理第三方应用的访问权限"""      class Status(models.TextChoices):         ACTIVE = 'active', '启用'         INACTIVE = 'inactive', '禁用'      id = models.BigAutoField(         primary_key=True,         verbose_name='主键ID',         help_text='系统自动生成的唯一标识符',         db_comment='物理主键,自增ID'     )      guid = models.UUIDField(         default=uuid.uuid4,         unique=True,         editable=False,         verbose_name='全局唯一标识',         help_text='系统自动生成的UUID,用作逻辑主键',         db_comment='逻辑主键,GUID'     )      app_id = models.CharField(         max_length=100,         unique=True,         verbose_name='应用标识',         help_text='用于标识调用方系统的唯一ID,建议使用有意义的命名',         db_comment='应用标识,用于标识调用方系统',         validators=[             RegexValidator(                 regex=r'^[a-zA-Z0-9_-]+$',                 message='应用标识只能包含字母、数字、下划线和连字符'             )         ]     )      app_name = models.CharField(         max_length=200,         verbose_name='应用名称',         help_text='调用方应用的显示名称或备注信息',         db_comment='应用名称,调用方的名称或备注信息'     )      secret_key = models.CharField(         max_length=100,         verbose_name='密钥',         help_text='用于API签名验证的密钥,请妥善保管',         db_comment='密钥,用于签名验证',     )      allowed_ips = models.TextField(         blank=True,         null=True,         verbose_name='允许访问的IP',         help_text='允许访问的IP地址列表,多个IP用逗号分隔,留空表示不限制',         db_comment='允许访问的 IP 列表,逗号分隔'     )      scopes = models.TextField(         blank=True,         null=True,         verbose_name='权限范围',         help_text='应用的权限范围,格式如:project:read,project:write',         db_comment='权限范围,如 project:read,project:write'     )      status = models.CharField(         max_length=10,         choices=Status.choices,         default=Status.ACTIVE,         verbose_name='状态',         help_text='应用的启用状态',         db_comment='启用/禁用状态'     )      class Meta:         db_table = 'app_client'         verbose_name = '应用客户端'         verbose_name_plural = '应用客户端'         ordering = ['-create_time']         indexes = [             models.Index(fields=['app_id']),             models.Index(fields=['status']),             models.Index(fields=['create_time']),         ]      def __str__(self):         return f'{self.app_name} ({self.app_id})'      def is_active(self):         """检查应用是否处于活跃状态"""         # todo 为什么这个 is_deleted 会变成 str 啊??         # return self.status == self.Status.ACTIVE and not self.is_deleted         return self.status == self.Status.ACTIVE      def get_scopes_list(self):         """获取权限范围列表"""         if not self.scopes:             return []         return [scope.strip() for scope in self.scopes.split(',') if scope.strip()]      def get_allowed_ips_list(self):         """获取允许的IP地址列表"""         if not self.allowed_ips:             return []         return [ip.strip() for ip in self.allowed_ips.split(',') if ip.strip()]      def save(self, *args, **kwargs):         """重写save方法,添加自定义逻辑"""         # 确保app_name不为空         if not self.app_name:             self.app_name = self.app_id          super().save(*args, **kwargs) 

测试接口

添加一些测试接口,方便后续调试

代码位于 src/apps/app/apis/client/apis.py

from typing import List from ninja import Router from django.http import HttpRequest from core.authentication import (     api_key_auth,     api_key_header_auth,     api_key_bearer_auth,     require_app_client_scopes,     AppClientScopes,     AppClientScopeChecker, ) from apps.app.models import AppClient from .schemas import AppClientSchema, AppClientCreateSchema  router = Router(tags=['AppClient API'])  @router.get("/test/api-key", auth=api_key_auth) def test_api_key_auth(request: HttpRequest):     """测试API Key认证(查询参数方式)          使用方式: GET /api/app/test/api-key?api_key=your_secret_key     """     app_client = request.auth     return {         "message": "API Key认证成功",         "app_id": app_client.app_id,         "app_name": app_client.app_name,         "scopes": app_client.get_scopes_list(),     }   @router.get("/test/api-key-header", auth=api_key_header_auth) def test_api_key_header_auth(request: HttpRequest):     """测试API Key认证(请求头方式)          使用方式: GET /api/app/test/api-key-header     Headers: X-API-Key: your_secret_key     """     app_client = request.auth     return {         "message": "API Key Header认证成功",         "app_id": app_client.app_id,         "app_name": app_client.app_name,         "scopes": app_client.get_scopes_list(),     }   @router.get("/test/bearer", auth=api_key_bearer_auth) def test_bearer_auth(request: HttpRequest):     """测试Bearer Token认证          使用方式: GET /api/app/test/bearer     Headers: Authorization: Bearer your_secret_key     """     app_client = request.auth     return {         "message": "Bearer Token认证成功",         "app_id": app_client.app_id,         "app_name": app_client.app_name,         "scopes": app_client.get_scopes_list(),     }   @router.get("/test/scopes/project-read", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_READ]) def test_project_read_scope(request: HttpRequest):     """测试项目读取权限          需要权限: project:read     """     return {         "message": "项目读取权限验证成功",         "required_scope": AppClientScopes.PROJECT_READ,         "client_scopes": AppClientScopeChecker.get_client_scopes(request),     }   @router.get("/test/scopes/project-write", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_WRITE]) def test_project_write_scope(request: HttpRequest):     """测试项目写入权限          需要权限: project:write     """     return {         "message": "项目写入权限验证成功",         "required_scope": AppClientScopes.PROJECT_WRITE,         "client_scopes": AppClientScopeChecker.get_client_scopes(request),     }   @router.get("/test/scopes/multiple", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ]) def test_multiple_scopes(request: HttpRequest):     """测试多个权限要求          需要权限: project:read AND user:read     """     return {         "message": "多权限验证成功",         "required_scopes": [AppClientScopes.PROJECT_READ, AppClientScopes.USER_READ],         "client_scopes": AppClientScopeChecker.get_client_scopes(request),     }   @router.get("/test/scopes/any", auth=api_key_auth) def test_any_scope(request: HttpRequest):     """测试任意权限检查          检查是否具有项目或用户相关的任意权限     """     has_project_access = AppClientScopeChecker.has_any_scope(         request, [AppClientScopes.PROJECT_READ, AppClientScopes.PROJECT_WRITE]     )     has_user_access = AppClientScopeChecker.has_any_scope(         request, [AppClientScopes.USER_READ, AppClientScopes.USER_WRITE]     )          return {         "message": "权限检查完成",         "has_project_access": has_project_access,         "has_user_access": has_user_access,         "client_scopes": AppClientScopeChecker.get_client_scopes(request),     }   @router.get("/info", auth=api_key_auth) def get_client_info(request: HttpRequest):     """获取当前认证的AppClient信息"""     app_client = request.auth     return {         "app_id": app_client.app_id,         "app_name": app_client.app_name,         "status": app_client.status,         "scopes": app_client.get_scopes_list(),         "allowed_ips": app_client.get_allowed_ips_list(),         "create_time": app_client.create_time,     }   @router.get("/clients", auth=api_key_auth) @require_app_client_scopes([AppClientScopes.SUPER_ADMIN]) def list_app_clients(request: HttpRequest) -> List[dict]:     """列出所有AppClient(需要超级管理员权限)          需要权限: *     """     clients = AppClient.objects.filter(is_deleted=False)     return [         {             "app_id": client.app_id,             "app_name": client.app_name,             "status": client.status,             "scopes": client.get_scopes_list(),             "create_time": client.create_time,         }         for client in clients     ] 

管理命令

为了方便使用,我又添加了俩命令

顾名思义,创建新的 AppClient列出所有

  • src/apps/app/management/commands/create_app_client.py
  • src/apps/app/management/commands/list_app_clients.py

限于篇幅原因,无法贴出全部代码,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码

核心逻辑

关于鉴权的核心逻辑,我单独创建一个 core package 来存放

目录结构如下

 core  ├─ exceptions  │  ├─ __init__.py  │  └─ permissions.py  ├─ authentication  │  ├─ __init__.py  │  ├─ README.md  │  ├─ permissions.py  │  ├─ app.py  │  └─ api_key.py  └─ __init__.py 

最关键的代码就是 src/core/authentication/app.py 中的这几个认证实例

代码中有详细注释,一看就能理解。

如果觉得这个还比较复杂,后续我再介绍一个极简的实现方式。

class AppClientAuthMixin:     """AppClient认证混入类,提供通用的认证逻辑"""          def _get_client_ip(self, request: HttpRequest) -> str:         """获取客户端真实IP地址"""         x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')         if x_forwarded_for:             ip = x_forwarded_for.split(',')[0].strip()         else:             ip = request.META.get('REMOTE_ADDR')         return ip          def _validate_ip_access(self, app_client: AppClient, request: HttpRequest) -> bool:         """验证IP访问权限"""         allowed_ips = app_client.get_allowed_ips_list()         if not allowed_ips:  # 如果没有设置IP限制,则允许所有IP             return True                  client_ip = self._get_client_ip(request)                  for allowed_ip in allowed_ips:             try:                 # 支持单个IP和CIDR网段                 if '/' in allowed_ip:                     network = ipaddress.ip_network(allowed_ip, strict=False)                     if ipaddress.ip_address(client_ip) in network:                         return True                 else:                     if client_ip == allowed_ip:                         return True             except (ipaddress.AddressValueError, ValueError):                 # 忽略无效的IP格式                 continue                  return False          def _authenticate_app_client(self, request: HttpRequest, key: str) -> Optional[AppClient]:         """认证AppClient的通用逻辑"""         try:             app_client = AppClient.objects.get(secret_key=key)                          # 检查应用是否处于活跃状态             if not app_client.is_active():                 return None                          # 验证IP访问权限             if not self._validate_ip_access(app_client, request):                 return None                          # 将权限范围添加到request中,供后续权限检查使用             request.app_client_scopes = app_client.get_scopes_list()                          return app_client                      except AppClient.DoesNotExist:             return None   class AppClientApiKey(AppClientAuthMixin, APIKeyQuery):     """基于查询参数的AppClient API Key认证"""     param_name = "api_key"      def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]:         return self._authenticate_app_client(request, key)   class AppClientApiKeyHeader(AppClientAuthMixin, APIKeyHeader):     """基于请求头的AppClient API Key认证"""     param_name = "X-API-Key"      def authenticate(self, request: HttpRequest, key: str) -> Optional[AppClient]:         return self._authenticate_app_client(request, key)   class AppClientBearer(AppClientAuthMixin, HttpBearer):     """基于Bearer Token的AppClient认证"""      def authenticate(self, request: HttpRequest, token: str) -> Optional[AppClient]:         return self._authenticate_app_client(request, token)   # 创建认证实例,可以在API中直接使用 api_key_auth = AppClientApiKey() api_key_header_auth = AppClientApiKeyHeader() api_key_bearer_auth = AppClientBearer() 

极简实现方式

这套完善的鉴权逻辑,如果你觉得太复杂了

没关系,我这还做了个极简版本,去掉了权限、IP限制等乱七八糟的东西,只校验 api-key 是否有效

实现代码在 src/core/authentication/api_key.py

自定义的 ApiKey 类继承自 ninja.security.APIKeyQuery 类,表明 api_key 参数需要放在 GET query params

from ninja.security import APIKeyQuery from apps.app.models import AppClient   class ApiKey(APIKeyQuery):     param_name = "api_key"      def authenticate(self, request, key):         try:             c = AppClient.objects.get(secret_key=key)             print(f'api key authenticated: {c.secret_key}, app id: {c.app_id}')             return c         except AppClient.DoesNotExist:             pass 

使用也很简单

from apps.app.models import AppClient from core.authentication.api_key import ApiKey  @router.get("/data", auth=ApiKey()) def get_data(request):    assert isinstance(request.auth, AppClient)    return {"data": "success"} 

客户端请求的时候需要在 GET query params 里带上 AppClientsecret_key

校验通过的话,ninja 会自动在 request.auth 添加 AppClient 实例,在接口代码里可以直接获取这个实例。

小结

这篇文章居然写了两万多字🕶

这还是在删减了很多代码的情况下…

再啰嗦一次,有兴趣的同学可以在公众号后台回复「API鉴权」获取完整代码

这么长的文章,大概不会有人有耐心看到最后吧~

溜了,溜了~

发表评论

评论已关闭。

相关文章