Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

一、Celery定时任务

Celery Beat 介绍

Celery Beat 是 Celery 框架的一个内置组件,专门用于定时任务调度。它可以按照预设的时间规则(如固定间隔、特定时间点、CRON 表达式等)自动触发 Celery 任务,广泛应用于需要周期性执行的场景(如定时数据备份、日志清理、报表生成等)。

工作原理

  • Beat 进程:独立运行的调度进程,负责解析定时规则并生成任务消息。
  • 任务发送:当到达预设时间,Beat 进程将任务发送到 Celery 的消息队列(如 Redis)。
  • 任务执行:Celery Worker 进程从队列中获取任务并执行。

简单来说,Celery Beat 是 “定时发令枪”,而 Worker 是 “执行者”。

Celery Beat 配置持久化

默认情况下,任务配置存储在内存中,重启后会丢失。需要通过后端存储(如数据库)实现持久化,确保任务配置不丢失。

项目名称 说明
django-celery-beat 通过数据库实现任务配置持久化
django-celery-results 通过数据库实现任务结果持久化
django-celery 只支持Celery 3.X版本(不推荐)

二、Celery Beat与Django集成

安装配置

安装

pip install django-celery-beat django-celery-results 

在Django项目settings.py中添加

INSTALLED_APPS = [     ...     'django_celery_beat',     'django_celery_results' ]  ### Celery 配置 CELERY_BROKER_URL = f"{REDIS_URL}/{REDIS_DB}"  # 使用Redis作为消息代理 CELERY_RESULT_BACKEND = "django-db"  # 使用数据库存储结果 CELERY_BEAT_SCHEDULER = (     "django_celery_beat.schedulers:DatabaseScheduler"  # 使用数据库保存定时任务 ) CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True CELERY_RESULT_EXTENDED = True  # 启用后才会记录 task_name、date_started 等字段 CELERY_TASK_TRACK_STARTED = True  # 记录任务开始时间 

定义 Celery 实例:创建文件mysitemysitecelery.py

"""定义和配置 Celery 实例"""  import os from celery import Celery from django.conf import settings   os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings") # 创建 Celery 实例 app = Celery("mysite") # 加载配置文件中的 Celery 配置 app.config_from_object("django.conf:settings", namespace="CELERY") # 自动发现并加载任务 app.autodiscover_tasks(["myapp_infra", "myapp_system"] + settings.MY_APPS, force=True) 

配置 Django 启动时会加载应用:修改文件mysitemysite__init__.py

"""Django 启动时加载Celery实例"""  from .celery import app as celery_app  __all__ = ("celery_app",) 

数据库迁移

执行数据库迁移,创建相关数据库表。其中:

  • django_celery_beat_periodictask:用于存储任务名称、任务路径、任务参数等元数据。
  • django_celery_beat_crontabschedule:用于存储CRON表达式。
# 在Django项目根目录(包括manage.py的目录)执行 python manage.py migrate django_celery_beat python manage.py migrate django_celery_results 

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

三、Celery Beat项目实战

定义Celery任务

发现任务:Celery 将自动从所有已安装的应用APP中发现任务,需要遵守以下目录结构

- myapp_system/     - tasks.py     - models.py - myapp_infra/     - tasks.py     - models.py 

定义任务:创建文件myapp_infra/tasks.py,使用@shared_task装饰器定义 Celery 任务

"""定义 Celery 任务"""  from time import sleep from celery import shared_task from django.utils import timezone   @shared_task def send_daily_report():     # 示例:发送日报     print(f"开始发送日报,现在时间:{timezone.now()}")     sleep(30)     print("发送成功")     return "发送成功"   @shared_task def cleanup_expired_data():     # 示例:清理过期数据     print("清理过期数据")     sleep(15)     print("清理完成")     return "清理完成" 

通过视图集动态管理定时任务

下面是通过 DRF 视图集,动态管理定时任务示例,实现对定时任务的增删改查、手动触发、开启暂停等操作

  • 定义视图:myapp_infrajobviews.py
import json from celery import current_app from django_celery_beat.models import PeriodicTask from drf_spectacular.utils import extend_schema from rest_framework.decorators import action from rest_framework.generics import get_object_or_404  from mars_framework.viewsets.base import CustomModelViewSetNoSimple from mars_framework.permissions.base import HasPermission from mars_framework.response.base import CommonResponse from .serializers import JobSaveSerializer, JobSerializer from .filters import JobFilter from .services import infra_job_service   @extend_schema(tags=["管理后台-infra-定时任务"]) class JobViewSet(CustomModelViewSetNoSimple):     queryset = PeriodicTask.objects.all()     serializer_class = JobSerializer     filterset_class = JobFilter     action_serializers = {         "create": JobSaveSerializer,         "update": JobSaveSerializer,     }     action_permissions = {         "create": [HasPermission("infra:job:create")],         "destroy": [HasPermission("infra:job:delete")],  # TODO 是否需要删除对应shedule         "update": [HasPermission("infra:job:update")],         "retrieve": [HasPermission("infra:job:query")],         "list": [HasPermission("infra:job:query")],         "export": [HasPermission("infra:job:export")],         "update_status": [HasPermission("infra:job:update")],         "trigger": [HasPermission("infra:job:trigger")],         "sync": [HasPermission("infra:job:create")],         "get_next_times": [HasPermission("infra:job:query")],     }     action_querysets = {         # 排除name=celery.backend_cleanup         "list": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),         "export": PeriodicTask.objects.exclude(name="celery.backend_cleanup"),     }      export_name = "定时任务列表"     export_fields_labels = {         "id": "任务编号",         "name": "任务名称",         "task": "处理器名字",         "kwargs": "处理器参数",         "cron_expression": "CRON表达式",         "status": "任务状态",     }     export_data_map = {         "status": {1: "开启", 2: "暂停"},     }      @extend_schema(summary="新增")     def create(self, request, *args, **kwargs):         """创建定时任务"""         serializer = self.get_serializer(data=request.data)         serializer.is_valid(raise_exception=True)         # CRON表达式         cron_expression = serializer.validated_data.pop("cron_expression")         schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)         # 创建任务         task_data = {             "name": serializer.validated_data.get("name"),             "task": serializer.validated_data.get("task"),             "kwargs": serializer.validated_data.get("kwargs"),             "crontab": schedule,             "enabled": False,  # 默认禁用         }         PeriodicTask.objects.create(**task_data)         return CommonResponse.success()      @extend_schema(summary="更新")     def update(self, request, *args, **kwargs):         """更新定时任务"""         instance = self.get_object()         serializer = self.get_serializer(instance, data=request.data)         serializer.is_valid(raise_exception=True)         # 任务CRON表达式         cron_expression = serializer.validated_data.pop("cron_expression")         schedule = infra_job_service.get_or_create_crontab_schedule(cron_expression)         # 更新任务         task_data = {             "name": serializer.validated_data.get("name"),             "task": serializer.validated_data.get("task"),             "kwargs": serializer.validated_data.get("kwargs"),             "crontab": schedule,         }         PeriodicTask.objects.filter(id=instance.id).update(**task_data)         return CommonResponse.success()      @extend_schema(summary="触发定时任务")     @action(         methods=["put"],         detail=True,         url_path="trigger",     )     def trigger(self, request, *args, **kwargs):         """触发定时任务"""         instance = self.get_object()         # 获取任务函数并手动触发         task_name = instance.task  # 任务路径如 "myapp_infra.tasks.send_daily_report"         task_kwargs = json.loads(instance.kwargs or "{}")          try:             # 动态加载任务函数             task = current_app.tasks[task_name]             task.delay(**task_kwargs)             return CommonResponse.success()         except KeyError:             return CommonResponse.error(                 code=121101,                 msg=f"找不到 {task_name}  任务,或该任务未注册",             )         except Exception as e:             return CommonResponse.error(                 code=121102,                 msg=f"触发任务 {task_name} 失败,错误信息:{e}",             )      @extend_schema(summary="更新定时任务状态")     @action(         methods=["put"],         detail=True,         url_path="status",     )     def update_status(self, request, *args, **kwargs):         """更新定时任务状态"""         status = request.query_params.get("status")         if status is None or status not in ["1", "2"]:  # 1:开启 2:暂停             return CommonResponse.error(code=121104, msg="任务状态值错误")         instance = get_object_or_404(PeriodicTask, pk=kwargs.get("pk"))         instance.enabled = status == "1"         instance.save()         return CommonResponse.success()      @extend_schema(summary="获取定时任务的下 n 次执行时间")     @action(         methods=["get"],         detail=True,         url_path="next-times",     )     def get_next_times(self, request, *args, **kwargs):         """获取定时任务的下 n 次执行时间"""         count = int(request.query_params.get("count", 5))         task = self.get_object()         # 生成CORN 表达式         crontab = task.crontab         cron_expression = f"{crontab.minute} {crontab.hour} {crontab.day_of_month} {crontab.month_of_year} {crontab.day_of_week}"         try:             data = infra_job_service.get_next_times(cron_expression, count)         except Exception as e:             return CommonResponse.error(code=121102, msg=str(e))         return CommonResponse.success(data=data) 
  • 配置路由:myapp_infraurls.py
from .job.views import JobViewSet  # 管理后台 - 定时任务 router.register(r"job", JobViewSet, basename="job") 

启动Celery Beat

  • 启动Celery Worker和Celery Beat调度器
# 在项目目录(与manage.py同级),启动Celery Worker celery -A mysite worker -l info -P solo  # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Celery Beat celery -A mysite beat -l info -S django_celery_beat.schedulers:DatabaseScheduler  # 新建另一个终端窗口,在项目目录(与manage.py同级),启动Django python manage.py runserver 

四、实战效果

通过上面定义的DRF视图集API,配合 Vue3 前端界面实现效果

  • 定时任务的增、删、开启暂停功能

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

  • 定时任务的修改功能
    • 处理器名字:填写定义任务的全路径名称
    • CRON表达式:填写标准的CRON表达式

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

  • 定时任务执行结果查询功能:能看到定时任务的执行时间、状态、返回结果等信息

Django+Celery 进阶:动态定时任务的添加、修改与智能调度实战

点击查看完整代码


您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

发表评论

评论已关闭。

相关文章