1.什么是Celery?
Celery是一个基于Python的用于处理分布式任务的开源框架。它可以帮助开发者将复杂且耗时的任务拆分成小的、单独的任务单元,然后在多台机器或多个进程之间并行处理。这种方式可以大大提高任务的完成速度,也可以提高系统的可靠性。
Celery主要由以下4部分组成:
Broker:Celery使用中间件来处理任务(例如RabbitMQ、Redis等),Broker就是负责任务分配的消息中间件。
Workers:任务的执行者,它们从中间件中获取任务并进行执行。每个worker可以看做一个独立的进程或者线程。
Beat:Beat是Celery自带的守护进程,用于管理、调度定期执行的任务。
Client:客户端用于添加任务。
2.如何安装Celery?
2.1 安装Celery
安装celery非常简单,只需要使用pip即可:
pip install celery
2.2 安装消息中间件
Celery支持多种消息中间件,例如RabbitMQ、Redis、Beanstalk等。我们以RabbitMQ为例进行说明:
2.2.1 安装RabbitMQ服务器:
sudo apt-get install rabbitmq-server
2.2.2 安装RabbitMQ的Python库:
pip install amqp==2.5.2
如果你想使用其他的中间件,只需要将安装步骤替换成对应的命令即可。
3.如何使用Celery?
3.1 基本概念
3.1.1 创建任务
使用Celery,我们可以将任务封装成一个python函数。例如,我们有一个sum函数用于计算两个数的和:
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def sum(a, b):
return a + b
上述代码创建了一个名为sum的任务,其中@app.task是必需的装饰器,用于将函数注册为Celery任务。通过broker参数,我们指定消息中间件,此处使用了RabbitMQ。
3.1.2 执行任务
执行任务的方法有两种,一种是使用apply_async(异步)方式,一种是使用delay(同步)方式。例如:
#异步方式
result = sum.apply_async(args=[1,2], countdown=10)
#同步方式
result = sum.delay(1, 2)
result.get()
apply_async方法用于创建异步任务,args参数用于传递函数的参数,countdown参数用于指定任务执行的延时时间。
delay方法用于创建同步任务,get方法用于获取任务执行的结果。
3.2 可配置项
3.2.1 配置任务队列
Celery默认使用名为"celery"的队列,如果要使用其他队列,需要在tasks.py文件中添加以下代码:
app.conf.task_default_queue = 'my_queue'
3.2.2 配置任务结果
Celery默认使用RabbitMQ作为任务结果的存储方式,但我们也可以将结果存储到数据库或者其他的缓存中(如Redis):
app.conf.result_backend = 'db+postgresql://user:password@localhost/dbname'
4.实战案例
下面我们举个例子来说明Celery的具体使用方法:假设我们有一个爬虫程序,需要定期爬取某个网站的数据,并将数据存储到数据库中。
4.1 创建任务
我们创建一个名为crawl的任务,用于爬取数据。
@app.task
def crawl(url):
#复杂的爬虫代码
return data
4.2 使用定时器调度任务
我们使用Celery提供的Beat来调度任务。下面的代码会在每5秒钟调度一次任务:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.beat_schedule = {
'crawl-every-5-seconds': {
'task': 'tasks.crawl',
'schedule': 5.0,
'args': ('http://example.com',)
},
}
4.3 将数据存储到数据库
假设我们使用MongoDB作为数据存储的后端,下面的代码会将数据存储到MongoDB中:
from pymongo import MongoClient
@app.task
def crawl_and_save(url):
data = crawl(url)
client = MongoClient()
db = client['test']
collection = db['data']
collection.insert_one(data)
最终的完整代码如下:
from pymongo import MongoClient
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def crawl(url):
#复杂的爬虫代码
return data
@app.task
def crawl_and_save(url):
data = crawl(url)
client = MongoClient()
db = client['test']
collection = db['data']
collection.insert_one(data)
app.conf.beat_schedule = {
'crawl-every-5-seconds': {
'task': 'tasks.crawl_and_save',
'schedule': 5.0,
'args': ('http://example.com',)
},
}
5.总结
Celery是一个非常强大的Python库,可以帮助我们处理分布式任务。本文主要介绍了Celery的安装、使用方法以及一些常见的配置项,同时还通过一个实战案例来演示了Celery的具体使用方法。如果你需要处理复杂的耗时任务,建议学习并使用Celery。