1. 什么是Django-celery-beat
Django-celery-beat 是 Celery 在 Django 中的定时任务模块。Celery 主要用于支持异步任务队列或分布式任务队列。Django-celery-beat 则用于管理 Celery 的定时任务,可以让我们方便调度定期运行的任务,而不需要每次手动调用。
2. 安装Django-celery-beat模块
在使用 Django-celery-beat 前,需要先安装 Celery 和 Django-celery-beat 库。
pip install Celery
pip install django-celery-beat
3. 集成Django-celery-beat
3.1 配置Celery
首先,需要在 Django 项目的 settings.py 中添加 Celery 配置。
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_BEAT_SCHEDULE = {
'run-task-every-30-seconds': {
'task': 'myapp.tasks.run_task',
'schedule': 30.0,
},
}
这里的配置包括 Celery 的 broker 和 result_backend 配置。在这里我使用 Redis 作为 broker 和 result_backend。如果你使用其他的消息队列,可以将 URL 换成对应的链接。
Celery 的定时任务配置需要添加在 CELERY_BEAT_SCHEDULE 参数中。这个参数是一个字典,指定每个被定时执行的任务的配置信息。每个任务需要提供 task(任务)、schedule(执行时间间隔) 等参数。
示例中的配置表示每隔 30 秒执行一次 myapp.tasks.run_task() 任务。run_task() 是我自定义的一个任务,可以在 myapp/tasks.py 中定义这个任务的具体逻辑。
3.2 配置Django-celery-beat
配置好 Celery 后,在 Django 的 settings.py 中的 INSTALLED_APPS 参数中添加 django_celery_beat。
# settings.py
INSTALLED_APPS = [
...,
'django_celery_beat',
]
然后在项目的 urls.py 中加入配置项,把与 celery 处理相关的任务集成到 Django 中。这个需要用 celery_app 应用对象来调用,代码如下:
# urls.py
from django.urls import path, include
from celery import Celery
app_name = 'myapp'
# 指定 celery 配置文件,指定要初始化的任务和应用
celery_app = Celery('myapp')
celery_app.config_from_object('django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()
urlpatterns = [
...,
path('celery/', include('django_celery_beat.urls')),
]
在这里我将 celery_app 配置为 myapp,你可以根据自己的情况替换这个名称。
最后,在项目根目录下运行下面的命令启动 Celery worker 和 Celery beat。
celery -A myproject worker -l info
celery -A myproject beat --scheduler django_celery_beat.schedulers:DatabaseScheduler -l debug
4. 动态添加任务
在上面的步骤中我们已经配置好了 Celery 的定时任务。但有时我们希望实现动态添加任务,这样可以更灵活地进行任务管理,而不需要像之前一样一次性把所有的任务统统配置好。
为了实现动态添加任务,我们可以调用 Django-celery-beat 提供的 api,来添加新的任务。下面是具体的实现方式。
4.1 添加新任务
在 Django 的 views.py 中添加新的视图函数,用于接收客户端发送的消息,在接收到新任务数据时,会将这些数据写入数据库(默认使用 SQLite 数据库),从而动态的创建新的任务。
# views.py
import json
from django.http import JsonResponse
from django_celery_beat.models import PeriodicTask, IntervalSchedule
def add_task_view(request, *args, **kwargs):
schedule, created = IntervalSchedule.objects.get_or_create(
every=request.POST.get('every'),
period=IntervalSchedule.SECONDS
)
task = PeriodicTask(
name=request.POST.get('task_name'),
task=request.POST.get('task'),
interval=schedule,
args=json.dumps(request.POST.get('args', [])),
)
task.save()
response_data = {'success': True}
return JsonResponse(response_data)
这个视图函数会接收 POST 请求,解析数据后将数据存入数据库。
这里需要注意的是,需要先创建一个 IntervalSchedule 对象,指定任务的周期。这个周期的配置需在前端传递。args 参数是任务参数,可以在执行任务时传递。在这里我将 args 存为 json 字符串,你也可以根据自己的需求将这个参数存储为其他数据类型。
4.2 前端调用
我们需要前端来完成任务的添加动作,可以在前端页面中使用 Ajax 发送 POST 请求,将需要新建的任务信息发送到服务端。下面是示例代码:
function createNewTask(taskName, taskPath, every, args) {
$.ajax({
url: '/add_task/',
type: 'POST',
dataType: 'json',
data: {
task_name: taskName,
task: taskPath,
every: every,
args: args
},
success: function (data) {
console.log(data);
},
error: function (xhr, errmsg, err) {
console.log(xhr.status + ': ' + xhr.responseText);
}
});
}
这里的 createNewTask() 函数会向 /add_task/ 的 url 发送 POST 请求,其中发送的数据包含任务名、任务路径、周期、参数等信息,这些信息会在后端视图函数中被接收和处理。
4.3 完整示例
下面是一个完整的示例程序,可以将这个程序作为模板来快速实现 Django-celery-beat 的动态任务添加功能。
本示例程序包含 views.py、urls.py、templates 文件夹、tasks.py 等文件。
views.py:
import json
from django.http import JsonResponse
from django.shortcuts import render
from django_celery_beat.models import PeriodicTask, IntervalSchedule
def index(request):
return render(request, 'index.html')
def add_task_view(request, *args, **kwargs):
schedule, created = IntervalSchedule.objects.get_or_create(
every=request.POST.get('every'),
period=IntervalSchedule.SECONDS
)
task = PeriodicTask(
name=request.POST.get('task_name'),
task=request.POST.get('task'),
interval=schedule,
args=json.dumps(request.POST.get('args', [])),
)
task.save()
response_data = {'success': True}
return JsonResponse(response_data)
urls.py:
from django.urls import path
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from myapp.tasks import run_task
app_name = 'myapp'
celery_app = Celery('myapp')
celery_app.config_from_object('django.conf:settings', namespace='CELERY')
celery_app.autodiscover_tasks()
urlpatterns = [
path('', views.index, name='index'),
path('add_task/', views.add_task_view, name='add_task_view'),
path('celery/', include('django_celery_beat.urls')),
]
templates/index.html:
{% extends 'base.html' %}
{% block content %}
$('#create_task_button').on('click', function () {
var taskName = $('input[name=task_name]').val();
var taskPath = $('input[name=task]').val();
var every = $('input[name=every]').val();
var args = $('textarea[name=args]').val();
createNewTask(taskName, taskPath, every, args);
});
function createNewTask(taskName, taskPath, every, args) {
$.ajax({
url: '/add_task/',
type: 'POST',
dataType: 'json',
data: {
task_name: taskName,
task: taskPath,
every: every,
args: args
},
success: function (data) {
console.log(data);
},
error: function (xhr, errmsg, err) {
console.log(xhr.status + ': ' + xhr.responseText);
}
});
}
{% endblock %}
tasks.py:
import time
from celery import shared_task
@shared_task
def run_task():
print('Task running...')
time.sleep(5)
print('Task finished.')
这个示例程序基本上涵盖了 Django-celery-beat 动态添加任务的全部流程。可以通过这个程序来学习和使用 Django-celery-beat。