介绍
最近工作要异步执行任务,需要用到Celery故简单记录一下使用过程。
Celery常用于实时处理任务队列或任务调度,Celery适用于以下场景:
- 异步执行任务:当需要发送邮件或执行耗时操作时,程序需要等待执行结果才能继续。若将这些任务交给Celery异步执行,可大大提高效率。
- 定时任务:Celery支持定时任务调度,可用于实现crontab之类的功能。
基础概念
Celery一般的工作流程如下所示:1
| Producer (Python) | ---> | Broker (Redis/RabbitMQ) | ---> | Celery Worker (1..N) | ---> | Backend (Redis/RabbitMQ) |
Producer
:任务生产者。这里使用的是Python,通过Python调用Celery的API生产任务。Broker
:消息代理。接收Producer生产的任务,存入任务队列后依序将任务分派给Celery Worker。目前Celery支持的Broker有:Celery Broker。最常用的就是RabbitMQ和RedisCelery Worker
:任务消费者。用于执行任务,通常需要使用celery -A [celery.task] worker -l INFO
命令运行Celery Worker。Backend
:任务结果存储。存储任务执行的结果以便查询,默认支持和Broker差不多,常用的也是RabbitMQ和Redis。是否存储结果是可选的,若不设置Backend则不存储任务结果。
安装
安装使用pip即可,这里使用的Broker为Redis所以在安装Celery时一起将相关依赖给装上1
2
3
4
5#Redis
pip install celery[redis]
#RabbitMQ
pip install celery[librabbitmq,msgpack]
应用实例
配置
要使用Celery先要告知Celery使用什么Broker、Backend等一系列信息,故需要进行相关配置。配置大致可分两种形式,一种是直接将Broker等信息写到代码中,一种是将信息先写入配置文件,然后在代码中加载配置。为了方便管理配置,我习惯使用后者。
- 配置文件——
celeryconfig.py
配置文件中有些涉及到定时任务调度,后面会提到可先忽略。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from celery.schedules import crontab
#Redis作为Broker
BROKER_URL = 'redis://localhost:6379/0'
#Redis作为Backend
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
#任务结果过期时间(seconds)
CELERY_TASK_RESULT_EXPIRES = 60 * 60
#BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 144000}
#CELERY_IGNORE_RESULT = True
#CELERYD_MAX_TASKS_PER_CHILD = 100
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
#CELERY_TASK_SERIALIZER = 'msgpack'
#CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
创建任务(task)
使用Celery的API创建任务非常简单,两步即可完成:
- 实例化Celery对象,命名为
celery
- 使用**`@celery.task`**装饰器装饰任务函数
任务程序——
tasks.py
默认Celery Worker是不允许以root用户启动,若需要以root用户启动则需配置platforms.C_FORCE_ROOT = True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from celery import Celery, platforms
import paramiko
#实例化Celery对象
celery = Celery(__name__)
#从配置文件加载配置
celery.config_from_object('celeryconfig')
#允许root启动
#platforms.C_FORCE_ROOT = True
@celery.task
def add(x, y):
return x + y
#设置重试
@celery.task(bind=True, default_retry_delay=300, max_retries=5)
def add_retry(x, y)
return x + y调用任务——
test.py
调用已写好的任务程序产生任务
add.delay(1, 1)
其实是add.apply_async(1, 1)
的简写。1
2
3from tasks import add
add.delay(1, 1)运行Celery Worker
需要运行Celery Worker去执行任务,可结合Supervisor1
2cd your_project_dir
celery -A tasks worker -l INFO
定时任务
Celery支持任务调度,使用Celery Beat执行定时任务
配置文件——
crontab_celeryconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from celery.schedules import crontab
BROKER_URL = 'redis://localhost:6379/2'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/3'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
"add": {
"task": "crontab_tasks.sendmail",
"schedule": crontab(minute='*/1')
}
}任务程序——crontab_tasks.py
1
2
3
4
5
6
7
8from celery import Celery
crontab_celery = Celery(__name__)
crontab_celery.config_from_object('crontab_celeryconfig')
@crontab_celery.task
def sendmail():
print "Celery Crontab Tast Test"Celery Worker & Beat
启动Celery的Worker和Beat,每分钟定时执行sendmail()
函数1
celery -A crontab_tasks worker -B -l INFO -s /tmp/celerybeat-schedule
后续
目前只是简单的使用了Celery的基础功能,更多高级功能以后用到再继续补上
手动指定路由(Route)和队列(Queue)
当使用RabbitMQ作为Broker时,可手动指定任务的队列和路由。Celery默认所有任务都到celery
的队列中。详细参考以下文章:flower
flower提供Web界面对Celery进行监控