关于Django使用 django-celery-beat动态添加定时任务的方法

1. 什么是Django-celery-beat

Django-celery-beat 是 Celery 在 Django 中的定时任务模块。Celery 主要用于支持异步任务队列或分布式任务队列。Django-celery-beat 则用于管理 Celery 的定时任务,可以让我们方便调度定期运行的任务,而不需要每次手动调用。

2. 安装Django-celery-beat模块

在使用 Django-celery-beat 前,需要先安装 Celery 和 Django-celery-beat 库。

pip install Celery

pip install django-celery-beat

3. 集成Django-celery-beat

3.1 配置Celery

首先,需要在 Django 项目的 settings.py 中添加 Celery 配置。

# settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'

CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

CELERY_BEAT_SCHEDULE = {

'run-task-every-30-seconds': {

'task': 'myapp.tasks.run_task',

'schedule': 30.0,

},

}

这里的配置包括 Celery 的 broker 和 result_backend 配置。在这里我使用 Redis 作为 broker 和 result_backend。如果你使用其他的消息队列,可以将 URL 换成对应的链接。

Celery 的定时任务配置需要添加在 CELERY_BEAT_SCHEDULE 参数中。这个参数是一个字典,指定每个被定时执行的任务的配置信息。每个任务需要提供 task(任务)、schedule(执行时间间隔) 等参数。

示例中的配置表示每隔 30 秒执行一次 myapp.tasks.run_task() 任务。run_task() 是我自定义的一个任务,可以在 myapp/tasks.py 中定义这个任务的具体逻辑。

3.2 配置Django-celery-beat

配置好 Celery 后,在 Django 的 settings.py 中的 INSTALLED_APPS 参数中添加 django_celery_beat。

# settings.py

INSTALLED_APPS = [

...,

'django_celery_beat',

]

然后在项目的 urls.py 中加入配置项,把与 celery 处理相关的任务集成到 Django 中。这个需要用 celery_app 应用对象来调用,代码如下:

# urls.py

from django.urls import path, include

from celery import Celery

app_name = 'myapp'

# 指定 celery 配置文件,指定要初始化的任务和应用

celery_app = Celery('myapp')

celery_app.config_from_object('django.conf:settings', namespace='CELERY')

celery_app.autodiscover_tasks()

urlpatterns = [

...,

path('celery/', include('django_celery_beat.urls')),

]

在这里我将 celery_app 配置为 myapp,你可以根据自己的情况替换这个名称。

最后,在项目根目录下运行下面的命令启动 Celery worker 和 Celery beat。

celery -A myproject worker -l info

celery -A myproject beat --scheduler django_celery_beat.schedulers:DatabaseScheduler -l debug

4. 动态添加任务

在上面的步骤中我们已经配置好了 Celery 的定时任务。但有时我们希望实现动态添加任务,这样可以更灵活地进行任务管理,而不需要像之前一样一次性把所有的任务统统配置好。

为了实现动态添加任务,我们可以调用 Django-celery-beat 提供的 api,来添加新的任务。下面是具体的实现方式。

4.1 添加新任务

在 Django 的 views.py 中添加新的视图函数,用于接收客户端发送的消息,在接收到新任务数据时,会将这些数据写入数据库(默认使用 SQLite 数据库),从而动态的创建新的任务。

# views.py

import json

from django.http import JsonResponse

from django_celery_beat.models import PeriodicTask, IntervalSchedule

def add_task_view(request, *args, **kwargs):

schedule, created = IntervalSchedule.objects.get_or_create(

every=request.POST.get('every'),

period=IntervalSchedule.SECONDS

)

task = PeriodicTask(

name=request.POST.get('task_name'),

task=request.POST.get('task'),

interval=schedule,

args=json.dumps(request.POST.get('args', [])),

)

task.save()

response_data = {'success': True}

return JsonResponse(response_data)

这个视图函数会接收 POST 请求,解析数据后将数据存入数据库。

这里需要注意的是,需要先创建一个 IntervalSchedule 对象,指定任务的周期。这个周期的配置需在前端传递。args 参数是任务参数,可以在执行任务时传递。在这里我将 args 存为 json 字符串,你也可以根据自己的需求将这个参数存储为其他数据类型。

4.2 前端调用

我们需要前端来完成任务的添加动作,可以在前端页面中使用 Ajax 发送 POST 请求,将需要新建的任务信息发送到服务端。下面是示例代码:

function createNewTask(taskName, taskPath, every, args) {

$.ajax({

url: '/add_task/',

type: 'POST',

dataType: 'json',

data: {

task_name: taskName,

task: taskPath,

every: every,

args: args

},

success: function (data) {

console.log(data);

},

error: function (xhr, errmsg, err) {

console.log(xhr.status + ': ' + xhr.responseText);

}

});

}

这里的 createNewTask() 函数会向 /add_task/ 的 url 发送 POST 请求,其中发送的数据包含任务名、任务路径、周期、参数等信息,这些信息会在后端视图函数中被接收和处理。

4.3 完整示例

下面是一个完整的示例程序,可以将这个程序作为模板来快速实现 Django-celery-beat 的动态任务添加功能。

本示例程序包含 views.py、urls.py、templates 文件夹、tasks.py 等文件。

views.py:

import json

from django.http import JsonResponse

from django.shortcuts import render

from django_celery_beat.models import PeriodicTask, IntervalSchedule

def index(request):

return render(request, 'index.html')

def add_task_view(request, *args, **kwargs):

schedule, created = IntervalSchedule.objects.get_or_create(

every=request.POST.get('every'),

period=IntervalSchedule.SECONDS

)

task = PeriodicTask(

name=request.POST.get('task_name'),

task=request.POST.get('task'),

interval=schedule,

args=json.dumps(request.POST.get('args', [])),

)

task.save()

response_data = {'success': True}

return JsonResponse(response_data)

urls.py:

from django.urls import path

from django.views.decorators.csrf import csrf_exempt

from celery import Celery

from myapp.tasks import run_task

app_name = 'myapp'

celery_app = Celery('myapp')

celery_app.config_from_object('django.conf:settings', namespace='CELERY')

celery_app.autodiscover_tasks()

urlpatterns = [

path('', views.index, name='index'),

path('add_task/', views.add_task_view, name='add_task_view'),

path('celery/', include('django_celery_beat.urls')),

]

templates/index.html:

{% extends 'base.html' %}

{% block content %}

{% endblock %}

tasks.py:

import time

from celery import shared_task

@shared_task

def run_task():

print('Task running...')

time.sleep(5)

print('Task finished.')

这个示例程序基本上涵盖了 Django-celery-beat 动态添加任务的全部流程。可以通过这个程序来学习和使用 Django-celery-beat。

后端开发标签