如何使用Celery实现分布式任务调度

什么是Celery

Celery是一个基于分布式消息传递实现的异步任务队列/作业队列。它的设计目标是用于分布式系统中的任务调度,异步处理等等。

Celery由三部分组成:消息中间件、任务执行单元、任务执行结果状态维护。

其中消息中间件为单独的进程,负责分发任务和返回任务执行结果。

任务执行单元为较轻的进程,负责执行任务和将结果返回给消息中间件。

任务执行结果状态维护为数据库或者缓存等,负责存储任务的执行结果和状态。

Celery的使用场景

Celery适用于有大量同类任务,同时任务可拆分为较小的子任务的场景。比如说,网站上的数据分析任务、定时任务等。

Celery的优点在于它可以帮助我们完成异步处理任务,做到任务与请求相分离,提高我们产品的效率和性能。

通过Celery实现分布式任务调度的步骤:

步骤1:安装Celery

使用pip命令安装celery:

pip install celery

步骤2:创建任务

下面是一个简单的任务示例,它将会被Celery装饰器修饰,以便Celery可以识别并进行调度。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task

def add(x, y):

return x + y

在上面的代码中,我们创建了一个名为add的任务,装饰器@app.task使得该函数可以在Celery中使用。

步骤3:启动Celery

Celery的启动有两种方式,一种是通过代码启动,另一种是通过命令行启动。在这里,我们展示通过代码启动Celery的方式。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task

def add(x, y):

return x + y

if __name__ == '__main__':

app.start()

步骤4:运行任务

下面是运行任务的方式:

from tasks import add

result = add.delay(4, 4)

print(result.get())

代码中,我们首先导入了任务add,然后通过add.delay方法来异步地启动任务,这里的参数4和4是任务需要的参数。最后,我们通过task_result.get()方法获得了任务返回的结果。

任务调度的优化

并发调度

Celery支持并发处理任务,通过设置任务的并发数,可以提高任务的处理效率。

我们可以在启动任务时,设置并发数,示例如下:

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task(bind=True)

def process_file(self, filename):

for i in range(0, 100):

self.update_state(state='PROGRESS', meta={'current': i, 'total': 100})

process_item(filename[i])

在上面的代码中,我们通过bind=True参数来使得任务成为bound任务,这样我们就可以使用self参数来跟踪任务的执行进度。

当我们使用worker命令启动多个worker时,每个worker都可以并发地处理任务,提高任务的处理效率。

celery -A tasks worker --concurrency=4

任务重试

在任务调度中,一些任务会因为网络波动或者其他异常情况而没有成功执行,此时 Celery就提供了一个任务重试的功能。我们可以设置任务的最大重试次数和重试的时间间隔。

app.conf.update(

task_serializer='json',

result_serializer='json',

accept_content=['json'],

timezone='Asia/Shanghai',

enable_utc=True,

task_ignore_result=False,

task_acks_late=True,

task_reject_on_worker_lost=True,

task_default_exchange='tasks',

task_default_queue='tasks',

task_default_routing_key='task.process',

task_default_retry_delay=datetime.timedelta(minutes=1),

task_max_retries=10)

在上面的代码中,我们设置了默认的任务队列和路由规则,同时也将任务的最大重试次数设定为10次,每次重试的时间间隔为1分钟。这样,当任务因为异常情况而导致失败时,Celery会将任务重新放入任务队列,进行重试执行。

任务结果状态维护

Celery除了调度任务,还提供了结果状态的维护机制。比如说,我们可以将任务结果存储到数据库中,以便查询任务执行状态。

下面是使用Django ORM作为结果状态维护的示例代码:

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.update(

BROKER_TRANSPORT_OPTIONS={'max_retries': 100},

CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend',

CELERY_TASK_RESULT_EXPIRES=3600,

CELERY_RESULT_SERIALIZER='json',

CELERY_ACCEPT_CONTENT=['json'],

CELERY_ROUTES=routes)

在这里,我们将任务结果存储到了数据库中,同时将result_backend的值设定为djcelery.backends.database:DatabaseBackend,CELERY_TASK_RESULT_EXPIRES为结果的失效时间,这里设定为1小时。

总结

我们介绍了Celery的基本概念和使用步骤。同时,我们探讨了如何通过并发调度、任务重试、任务状态维护机制来优化任务调度,提升任务的处理效率。

Celery作为一个强大的分布式任务调度框架,在大型应用系统中发挥了举足轻重的作用,通过Celery,可以将应用系统中的任务处理变得更加高效、可控。

后端开发标签