1. 引言
Celery是一个分布式任务队列/调度系统,它使用Python编写。它可以轻松地处理大规模的并发任务,使得一些耗时任务不会阻塞Web请求的处理。同时,Django是一个流行的Web框架,它提供了丰富的功能和强大的ORM。在本文,我们将介绍如何使用Celery和Django处理异步任务。
2. 安装和配置Celery
2.1 安装
我们可以通过pip命令来安装Celery:
pip install celery
2.2 配置
我们需要在Django项目的settings.py文件中添加如下配置:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
上述代码中,我们使用Redis作为消息代理和结果后端。我们还指定了消息和结果的序列化方式。
3. 创建异步任务
我们可以在Django项目的任何地方创建异步任务,比如app中的tasks.py文件中。下面是一个简单的示例任务:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
我们只需要在函数上添加@shared_task
装饰器,就可以将该函数变为Celery任务。
4. 调用异步任务
4.1 同步调用
我们可以在Django项目的任何地方直接调用任务函数来执行该任务:
from .tasks import add
result = add.delay(2, 3)
print(result.get())
上述代码中,我们首先导入了异步任务add
,然后使用delay
方法来调用该任务,并传入参数。最后,我们通过get
方法来获取任务执行结果。
4.2 异步调用
使用同步调用会阻塞Web请求的处理,所以我们通常会使用异步调用。在Django中,我们可以使用AsyncResult
类来获取异步任务的结果。
首先,我们需要在视图中启动异步任务:
from .tasks import add
result = add.delay(2, 3)
return render(request, 'home.html', {'task_id': result.id})
上述代码中,我们将异步任务的ID传递给了模板。
然后,在模板中,我们可以使用JavaScript轮询AsyncResult
对象,以获取异步任务的结果:
function check_result(task_id) {
$.getJSON('/check-result/', {'task_id': task_id}, function(data) {
if (data.result) {
alert('Result: ' + data.result);
} else {
setTimeout(function() {
check_result(task_id);
}, 1000);
}
});
}
check_result('{{ task_id }}');
上述JavaScript代码中,我们使用$.getJSON
方法向服务器发送异步请求,以获取任务结果。如果请求返回了结果,我们就弹出结果窗口。否则,我们就每隔1秒钟轮询一次。
最后,在Django项目中,我们需要定义一个视图来返回异步任务的结果:
from celery.result import AsyncResult
from django.http import JsonResponse
def check_result(request):
task_id = request.GET.get('task_id')
result = AsyncResult(task_id).result
return JsonResponse({'result': result})
上述代码中,我们首先获取异步任务的ID,然后使用AsyncResult
类来获取任务的结果。
5. 高级功能
5.1 定时任务
使用Celery,我们可以轻松地创建定时任务。首先,我们需要在Django项目的settings.py文件中添加如下配置:
CELERY_BEAT_SCHEDULE = {
'task-name': {
'task': 'task-path',
'schedule': timedelta(minutes=1),
},
}
上述代码中,我们使用timedelta
指定了任务的执行间隔。
然后,我们需要在Django项目的任何地方创建定时任务,比如app中的tasks.py文件中:
from celery import shared_task
@shared_task
def send_email():
# send email for every minute
pass
上述代码中,我们创建了一个定时任务send_email
,每隔一分钟发送一封邮件。
最后,在Django项目中,我们需要启动Celery Beat来运行定时任务:
celery -A project-name beat
5.2 分布式任务
使用Celery,我们可以轻松地创建分布式任务。我们只需要在Django项目的settings.py文件中添加如下配置:
CELERY_ROUTES = {
'task-name': {'queue': 'queue-name'},
}
上述代码中,我们使用queue
指定了任务要发送到哪个队列。
然后,我们需要在Django项目的任何地方创建分布式任务,比如app中的tasks.py文件中:
from celery import shared_task
@shared_task
def send_email():
# send email
pass
上述代码中,我们创建了一个分布式任务send_email
,任务不在本地执行,而是被发送到指定的队列中。
最后,在Django项目中运行Celery Worker来执行分布式任务:
celery -A project-name worker -Q queue-name
6. 总结
Celery和Django是两个非常好的Python库。使用它们,我们可以轻松地处理异步任务,包括定时任务和分布式任务。本文介绍了如何安装和配置Celery,创建和调用异步任务,以及使用Celery的高级功能。希望这篇文章对你有所帮助。