Django+Celery实现动态配置定时任务的方法
在实际业务中,经常会涉及到定时任务的执行,而且这些任务的执行时间可能会根据需求动态变化。为了解决这个问题,我们可以使用Django和Celery来实现动态配置定时任务的功能。
1. 首先,什么是Django和Celery?
Django是一个基于Python的Web应用程序框架,它可以帮助我们快速地构建 Web 应用程序。
Celery是一个基于Python的分布式任务队列,它可以协调多个任务的执行。
2. 安装和配置Celery
首先,我们需要使用pip来安装Celery。
pip install Celery
接下来,在Django项目的settings.py文件中进行Celery的配置。
# CELERY配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
这里我们使用了Redis作为消息代理和结果存储的后端。
3. 动态配置定时任务
接下来,我们需要定义Celery任务。在本例中,我们将使用Celery Beat,它是一个Celery的扩展,可以用来处理定期执行的任务。 它通过设置间隔时间和固定时间来执行任务。
我们可以在 Django 的应用程序中创建 tasks.py 文件,以创建各种定时任务功能。
from datetime import timedelta
from celery.schedules import crontab
from celery.task import task, periodic_task
from celery.decorators import periodic_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@periodic_task(run_every=timedelta(seconds=60))
def my_task():
logger.info('Hello World')
上面的代码定义了一个名为 my_task 的任务。该任务将在每分钟执行一次,记录一条“Hello World”消息。
4. 动态配置任务间隔和执行时间
现在,我们已经成功地定义了任务,但是该任务执行的间隔时间和执行时间是固定的,在实际业务需求中,它们可能会发生变化。 因此,我们需要为任务动态配置间隔时间和执行时间。
为此,我们可以在数据库中创建一个新表(例如,TaskSchedule),并为每个任务定义表中的时间间隔和执行时间。 然后,我们可以通过查找表中的数据来动态配置定时任务。
首先,我们需要为任务创建一个模型。
from django.db import models
class TaskSchedule(models.Model):
name = models.CharField(max_length=255)
task = models.CharField(max_length=255)
interval_unit = models.CharField(max_length=10)
interval_value = models.IntegerField()
start_at = models.DateTimeField()
end_at = models.DateTimeField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __unicode__(self):
return self.name
然后,我们可以创建一个辅助函数来执行任务并从表中获取任务间隔和执行时间。
from celery.decorators import periodic_task
from datetime import datetime, timedelta
from .models import TaskSchedule
@periodic_task(run_every=timedelta(seconds=60))
def check_task_schedule():
logger.info('Checking for new scheduled tasks...')
tasks = TaskSchedule.objects.all()
for task in tasks:
interval = timedelta(**{task.interval_unit: task.interval_value})
start = task.start_at
end = task.end_at or None
# 如果当前时间在任务的执行时间范围内,则执行该任务
if start <= datetime.now() and (not end or end > datetime.now()):
exec_task.apply_async(args=[task.task], eta=start, expires=end)
# 更新任务间隔和执行时间
if exec_task.request.id in [x.id for x in TaskSchedule.objects.all()]:
task = TaskSchedule.objects.get(exec_task.request.id)
task.start_at += interval
task.save()
@task(name="execute_task")
def exec_task(task_name):
logger.info('Executing task %s' % task_name)
在上面的代码中,我们使用Celery的periodic_task装饰器来使任务周期性地运行。 check_task_schedule函数检查数据库中的所有任务,并为每个任务设置间隔时间和执行时间。 如果当前时间在任务的执行时间范围内,则执行该任务,并在下一个执行时间更新任务的下一个开始时间。
5. 总结
通过本文,我们学习了如何使用Django和Celery实现动态配置定时任务的功能。我们使用了Celery Beat来执行定期执行的任务,并创建了一个TaskSchedule模型来存储任务间隔和执行时间。最后,我们将创建的任务添加到任务队列中,并根据任务的下一次执行时间将任务从队列中删除。