介绍
 最近工作要异步执行任务,需要用到Celery故简单记录一下使用过程。
 Celery常用于实时处理任务队列或任务调度,Celery适用于以下场景:
- 异步执行任务:当需要发送邮件或执行耗时操作时,程序需要等待执行结果才能继续。若将这些任务交给Celery异步执行,可大大提高效率。
- 定时任务:Celery支持定时任务调度,可用于实现crontab之类的功能。
基础概念
 Celery一般的工作流程如下所示:1
| Producer (Python) | ---> | Broker (Redis/RabbitMQ) | ---> | Celery Worker (1..N) | ---> | Backend (Redis/RabbitMQ) |
- Producer:任务生产者。这里使用的是Python,通过Python调用Celery的API生产任务。
- Broker:消息代理。接收Producer生产的任务,存入任务队列后依序将任务分派给Celery Worker。目前Celery支持的Broker有:Celery Broker。最常用的就是RabbitMQ和Redis
- Celery Worker:任务消费者。用于执行任务,通常需要使用- celery -A [celery.task] worker -l INFO命令运行Celery Worker。
- Backend:任务结果存储。存储任务执行的结果以便查询,默认支持和Broker差不多,常用的也是RabbitMQ和Redis。是否存储结果是可选的,若不设置Backend则不存储任务结果。
安装
 安装使用pip即可,这里使用的Broker为Redis所以在安装Celery时一起将相关依赖给装上1
2
3
4
5#Redis
pip install celery[redis]
#RabbitMQ
pip install celery[librabbitmq,msgpack]
应用实例
配置
要使用Celery先要告知Celery使用什么Broker、Backend等一系列信息,故需要进行相关配置。配置大致可分两种形式,一种是直接将Broker等信息写到代码中,一种是将信息先写入配置文件,然后在代码中加载配置。为了方便管理配置,我习惯使用后者。
- 配置文件——celeryconfig.py
 配置文件中有些涉及到定时任务调度,后面会提到可先忽略。1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20from celery.schedules import crontab 
 #Redis作为Broker
 BROKER_URL = 'redis://localhost:6379/0'
 #Redis作为Backend
 CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
 #任务结果过期时间(seconds)
 CELERY_TASK_RESULT_EXPIRES = 60 * 60
 #BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 144000}
 #CELERY_IGNORE_RESULT = True
 #CELERYD_MAX_TASKS_PER_CHILD = 100
 CELERY_TASK_SERIALIZER = 'json'
 CELERY_RESULT_SERIALIZER = 'json'
 CELERY_ACCEPT_CONTENT = ['json']
 #CELERY_TASK_SERIALIZER = 'msgpack'
 #CELERY_ACCEPT_CONTENT = ['json', 'msgpack']
 CELERY_TIMEZONE = 'Asia/Shanghai'
 CELERY_ENABLE_UTC = False
创建任务(task)
使用Celery的API创建任务非常简单,两步即可完成:
- 实例化Celery对象,命名为celery
- 使用**`@celery.task`**装饰器装饰任务函数
- 任务程序—— - tasks.py
 默认Celery Worker是不允许以root用户启动,若需要以root用户启动则需配置- platforms.C_FORCE_ROOT = True- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20- from celery import Celery, platforms 
 import paramiko
 #实例化Celery对象
 celery = Celery(__name__)
 #从配置文件加载配置
 celery.config_from_object('celeryconfig')
 #允许root启动
 #platforms.C_FORCE_ROOT = True
 @celery.task
 def add(x, y):
 return x + y
 #设置重试
 @celery.task(bind=True, default_retry_delay=300, max_retries=5)
 def add_retry(x, y)
 return x + y
- 调用任务—— - test.py
 调用已写好的任务程序产生任务
 - add.delay(1, 1)其实是- add.apply_async(1, 1)的简写。- 1 
 2
 3- from tasks import add 
 add.delay(1, 1)
- 运行Celery Worker 
 需要运行Celery Worker去执行任务,可结合Supervisor- 1 
 2- cd your_project_dir 
 celery -A tasks worker -l INFO
定时任务
Celery支持任务调度,使用Celery Beat执行定时任务
- 配置文件—— - crontab_celeryconfig.py- 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17- from celery.schedules import crontab 
 BROKER_URL = 'redis://localhost:6379/2'
 CELERY_RESULT_BACKEND = 'redis://localhost:6379/3'
 CELERY_TASK_SERIALIZER = 'json'
 CELERY_RESULT_SERIALIZER = 'json'
 CELERY_ACCEPT_CONTENT = ['json']
 CELERY_TIMEZONE = 'Asia/Shanghai'
 CELERY_ENABLE_UTC = False
 CELERYBEAT_SCHEDULE = {
 "add": {
 "task": "crontab_tasks.sendmail",
 "schedule": crontab(minute='*/1')
 }
 }
- 任务程序——crontab_tasks.py - 1 
 2
 3
 4
 5
 6
 7
 8- from celery import Celery 
 crontab_celery = Celery(__name__)
 crontab_celery.config_from_object('crontab_celeryconfig')
 @crontab_celery.task
 def sendmail():
 print "Celery Crontab Tast Test"
- Celery Worker & Beat 
 启动Celery的Worker和Beat,每分钟定时执行- sendmail()函数- 1 - celery -A crontab_tasks worker -B -l INFO -s /tmp/celerybeat-schedule 
后续
目前只是简单的使用了Celery的基础功能,更多高级功能以后用到再继续补上
- 手动指定路由(Route)和队列(Queue) 
 当使用RabbitMQ作为Broker时,可手动指定任务的队列和路由。Celery默认所有任务都到- celery的队列中。详细参考以下文章:
- flower 
 flower提供Web界面对Celery进行监控