什么是Celery
Celery是一个基于分布式消息传递实现的异步任务队列/作业队列。它的设计目标是用于分布式系统中的任务调度,异步处理等等。
Celery由三部分组成:消息中间件、任务执行单元、任务执行结果状态维护。
其中消息中间件为单独的进程,负责分发任务和返回任务执行结果。
任务执行单元为较轻的进程,负责执行任务和将结果返回给消息中间件。
任务执行结果状态维护为数据库或者缓存等,负责存储任务的执行结果和状态。
Celery的使用场景
Celery适用于有大量同类任务,同时任务可拆分为较小的子任务的场景。比如说,网站上的数据分析任务、定时任务等。
Celery的优点在于它可以帮助我们完成异步处理任务,做到任务与请求相分离,提高我们产品的效率和性能。
通过Celery实现分布式任务调度的步骤:
步骤1:安装Celery
使用pip命令安装celery:
pip install celery
步骤2:创建任务
下面是一个简单的任务示例,它将会被Celery装饰器修饰,以便Celery可以识别并进行调度。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
在上面的代码中,我们创建了一个名为add的任务,装饰器@app.task使得该函数可以在Celery中使用。
步骤3:启动Celery
Celery的启动有两种方式,一种是通过代码启动,另一种是通过命令行启动。在这里,我们展示通过代码启动Celery的方式。
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
if __name__ == '__main__':
app.start()
步骤4:运行任务
下面是运行任务的方式:
from tasks import add
result = add.delay(4, 4)
print(result.get())
代码中,我们首先导入了任务add,然后通过add.delay方法来异步地启动任务,这里的参数4和4是任务需要的参数。最后,我们通过task_result.get()方法获得了任务返回的结果。
任务调度的优化
并发调度
Celery支持并发处理任务,通过设置任务的并发数,可以提高任务的处理效率。
我们可以在启动任务时,设置并发数,示例如下:
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task(bind=True)
def process_file(self, filename):
for i in range(0, 100):
self.update_state(state='PROGRESS', meta={'current': i, 'total': 100})
process_item(filename[i])
在上面的代码中,我们通过bind=True参数来使得任务成为bound任务,这样我们就可以使用self参数来跟踪任务的执行进度。
当我们使用worker命令启动多个worker时,每个worker都可以并发地处理任务,提高任务的处理效率。
celery -A tasks worker --concurrency=4
任务重试
在任务调度中,一些任务会因为网络波动或者其他异常情况而没有成功执行,此时 Celery就提供了一个任务重试的功能。我们可以设置任务的最大重试次数和重试的时间间隔。
app.conf.update(
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
enable_utc=True,
task_ignore_result=False,
task_acks_late=True,
task_reject_on_worker_lost=True,
task_default_exchange='tasks',
task_default_queue='tasks',
task_default_routing_key='task.process',
task_default_retry_delay=datetime.timedelta(minutes=1),
task_max_retries=10)
在上面的代码中,我们设置了默认的任务队列和路由规则,同时也将任务的最大重试次数设定为10次,每次重试的时间间隔为1分钟。这样,当任务因为异常情况而导致失败时,Celery会将任务重新放入任务队列,进行重试执行。
任务结果状态维护
Celery除了调度任务,还提供了结果状态的维护机制。比如说,我们可以将任务结果存储到数据库中,以便查询任务执行状态。
下面是使用Django ORM作为结果状态维护的示例代码:
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.update(
BROKER_TRANSPORT_OPTIONS={'max_retries': 100},
CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',
CELERY_TASK_RESULT_EXPIRES=3600,
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_ROUTES=routes)
在这里,我们将任务结果存储到了数据库中,同时将result_backend的值设定为djcelery.backends.database:DatabaseBackend,CELERY_TASK_RESULT_EXPIRES为结果的失效时间,这里设定为1小时。
总结
我们介绍了Celery的基本概念和使用步骤。同时,我们探讨了如何通过并发调度、任务重试、任务状态维护机制来优化任务调度,提升任务的处理效率。
Celery作为一个强大的分布式任务调度框架,在大型应用系统中发挥了举足轻重的作用,通过Celery,可以将应用系统中的任务处理变得更加高效、可控。