Celery知识点总结

1.什么是Celery?

Celery是一个基于Python的用于处理分布式任务的开源框架。它可以帮助开发者将复杂且耗时的任务拆分成小的、单独的任务单元,然后在多台机器或多个进程之间并行处理。这种方式可以大大提高任务的完成速度,也可以提高系统的可靠性。

Celery主要由以下4部分组成:

Broker:Celery使用中间件来处理任务(例如RabbitMQ、Redis等),Broker就是负责任务分配的消息中间件。

Workers:任务的执行者,它们从中间件中获取任务并进行执行。每个worker可以看做一个独立的进程或者线程。

Beat:Beat是Celery自带的守护进程,用于管理、调度定期执行的任务。

Client:客户端用于添加任务。

2.如何安装Celery?

2.1 安装Celery

安装celery非常简单,只需要使用pip即可:

pip install celery

2.2 安装消息中间件

Celery支持多种消息中间件,例如RabbitMQ、Redis、Beanstalk等。我们以RabbitMQ为例进行说明:

2.2.1 安装RabbitMQ服务器:

sudo apt-get install rabbitmq-server

2.2.2 安装RabbitMQ的Python库:

pip install amqp==2.5.2

如果你想使用其他的中间件,只需要将安装步骤替换成对应的命令即可。

3.如何使用Celery?

3.1 基本概念

3.1.1 创建任务

使用Celery,我们可以将任务封装成一个python函数。例如,我们有一个sum函数用于计算两个数的和:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task

def sum(a, b):

return a + b

上述代码创建了一个名为sum的任务,其中@app.task是必需的装饰器,用于将函数注册为Celery任务。通过broker参数,我们指定消息中间件,此处使用了RabbitMQ。

3.1.2 执行任务

执行任务的方法有两种,一种是使用apply_async(异步)方式,一种是使用delay(同步)方式。例如:

#异步方式

result = sum.apply_async(args=[1,2], countdown=10)

#同步方式

result = sum.delay(1, 2)

result.get()

apply_async方法用于创建异步任务,args参数用于传递函数的参数,countdown参数用于指定任务执行的延时时间。

delay方法用于创建同步任务,get方法用于获取任务执行的结果。

3.2 可配置项

3.2.1 配置任务队列

Celery默认使用名为"celery"的队列,如果要使用其他队列,需要在tasks.py文件中添加以下代码:

app.conf.task_default_queue = 'my_queue'

3.2.2 配置任务结果

Celery默认使用RabbitMQ作为任务结果的存储方式,但我们也可以将结果存储到数据库或者其他的缓存中(如Redis):

app.conf.result_backend = 'db+postgresql://user:password@localhost/dbname'

4.实战案例

下面我们举个例子来说明Celery的具体使用方法:假设我们有一个爬虫程序,需要定期爬取某个网站的数据,并将数据存储到数据库中。

4.1 创建任务

我们创建一个名为crawl的任务,用于爬取数据。

@app.task

def crawl(url):

#复杂的爬虫代码

return data

4.2 使用定时器调度任务

我们使用Celery提供的Beat来调度任务。下面的代码会在每5秒钟调度一次任务:

from celery import Celery

from celery.schedules import crontab

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.beat_schedule = {

'crawl-every-5-seconds': {

'task': 'tasks.crawl',

'schedule': 5.0,

'args': ('http://example.com',)

},

}

4.3 将数据存储到数据库

假设我们使用MongoDB作为数据存储的后端,下面的代码会将数据存储到MongoDB中:

from pymongo import MongoClient

@app.task

def crawl_and_save(url):

data = crawl(url)

client = MongoClient()

db = client['test']

collection = db['data']

collection.insert_one(data)

最终的完整代码如下:

from pymongo import MongoClient

from celery import Celery

from celery.schedules import crontab

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task

def crawl(url):

#复杂的爬虫代码

return data

@app.task

def crawl_and_save(url):

data = crawl(url)

client = MongoClient()

db = client['test']

collection = db['data']

collection.insert_one(data)

app.conf.beat_schedule = {

'crawl-every-5-seconds': {

'task': 'tasks.crawl_and_save',

'schedule': 5.0,

'args': ('http://example.com',)

},

}

5.总结

Celery是一个非常强大的Python库,可以帮助我们处理分布式任务。本文主要介绍了Celery的安装、使用方法以及一些常见的配置项,同时还通过一个实战案例来演示了Celery的具体使用方法。如果你需要处理复杂的耗时任务,建议学习并使用Celery。

后端开发标签