Celery 是一个强大的异步任务调度框架.
类似于 生产/消费模型 中的生产者和消费者, 在 celery 中也有这样的几个角色, 分别是 broker backend work 和 task
broker 是用于传输消息的中间件, 当应用程序调用异步任务的时候, 会向 broker 传递消息, 之后 worker 取出任务并执行
RabbitMQ Redis 可作为 broker
backend 存储 worker 执行任务后返回的结果和状态
RabbitMQ Redis 甚至 Databases 都可作为 backend
worker 就是 task 的工作者, 从 broker 中取出任务并执行
顾名思义, 你想执行的任务
1
2
3
4
5
6
7
|
Async Task Schedule Task
\ /
Broker
/ | \
Worker Worker Worker
\ | /
backend
|
使用 celery.Celery
创建应用
1
2
3
|
from celery import Celery
app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='reids://127.0.0.1:6379/0')
|
celery 提供了装饰器便于创建任务
1
2
3
4
|
@app.task
def add(x, y):
print('hello world')
return x + y
|
同目录下运行 celery -A task worker -l info -n %h.add
-A
指定应用名, -l
指定不同的 loglevel
-n
用于指定当前的节点名, 其中 %h
包含当前的域名和主机名, %n
只包含主机名, %d
只包含域名
1
2
3
4
5
6
7
8
9
10
11
12
|
In [1]: from tasks import add
In [2]: result = add.delay('2','3')
In [3]: result.status
Out[3]: 'SUCCESS'
In [4]: result.ready()
Out[4]: True
In [5]: result.get()
Out[5]: '23'
|
一般使用 delay(args)
触发任务, ready()
则说明任务是否执行成功
get()
获取返回结果, status
表示当前任务的状态
1
2
3
4
5
6
|
PENDING 待执行
STARTED 开始执行
SUCCESS 执行成功
FAILURE 执行失败
REVOKED 撤销
RETRY 重试
|
创建 config.py
1
2
3
|
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'
|
CELERY_RESULT_BACKEND
指定 backend
BROKER_URL
指定 broker
TIMEZONE
指定当前时区
从 config.py
中导入配置
1
2
|
app = Celery()
app.config_from_object('config')
|
默认情况下所有任务的信息都保存在 celery
队列中, 很容易造成阻塞
通过创建不同的队列, 使用路由将不同的任务的信息存储到指定到队列中
创建三个任务 taskA
taskB
add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@app.task
def taskA(m):
print('taskA',m)
return m
@app.task
def taskB(m):
print('taskB',m)
return m
@app.task
def add(m):
print('add',m)
return m
|
编辑 config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from kombu import Exchange,Queue
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'
CELERY_QUEUES = (
Queue('default',Exchange('default'),routing_key='default'),
Queue('taskA',Exchange('taskA'),routing_key='taskA'),
Queue('taskB',Exchange('taskB'),routing_key='taskB'),
Queue('Add',Exchange('Add'),routing_key='Add')
)
CELERY_ROUTES = {
'tasks.taskA':{'queue':'taskA','routing_key':'taskA'},
'tasks.taskB':{'queue':'taskB','routing_key':'taskB'},
'tasks.add':{'queue':'Add','routing_key':'Add'}
}
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'
|
在 CELERY_QUEUES
中添加 Queue
对象创建队列
Queue(QUEUE_NAME, EXCHANGE, ROUTING_KEY)
Exchange
通过 Routing_Key
把信息路由到不同的 Queue
中
通过在 CELERY_ROUTES
中建立对应的映射关系指定任务使用的队列
OBJECT.TASK_NAME: {'queue':QUEUE_NAME,'routeing_key':ROUTING_KEY}
最后三行指定默认的存储队列, 当然也可以不写, 这样默认队列就为 celery
依次开启三个窗口, 执行以下命令
1
2
3
|
celery -A tasks worker -l info -Q taskA -n %h.taskA
celery -A tasks worker -l info -Q taskB -n %h.taskB
celery -A tasks worker -l info -Q add -n %h.add
|
使用 -Q
参数指定队列, 主机仅接受当前队列的任务, 这样不同队列的任务就会被分发到其它节点执行
celery 支持定时任务和计划任务, 通过 beat
节点向 worker 发送任务
编辑 config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from celery.schedules import crontab
from datetime import timedelta
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULE = {
'schedule_task': {
'task': 'tasks.add',
'schedule': timedelta(seconds=5),
'args': ('hello world',)
},
}
|
在 CELERYBEAT_SCHEDULE
中添加定时任务
SCHEDULE_TASK_NAME: {'task': TASK_NAME, 'schedule': TIME, 'args': ARGS}
schedule
可为间隔秒数 timedelta(secounds=5)
或 crontab(hour=7, minute=30, day_of_week=1)
依次启动节点
1
2
|
celery -A tasks beat -l info
celery -A tasks worker -l info
|
beat
1
2
3
|
[2018-08-12 17:02:23,224: INFO/MainProcess] beat: Starting...
[2018-08-12 17:02:24,283: INFO/MainProcess] Scheduler: Sending due task schedule_task (tasks.add)
[2018-08-12 17:02:25,276: INFO/MainProcess] Scheduler: Sending due task schedule_task (tasks.add)
|
worker
1
2
3
4
5
6
7
8
|
[2018-08-12 17:02:24,296: INFO/MainProcess] Received task: tasks.add[531b5769-8919-4d59-b339-cc0e36456901]
[2018-08-12 17:02:24,298: WARNING/ForkPoolWorker-2] Add
[2018-08-12 17:02:24,298: WARNING/ForkPoolWorker-2] hello world
[2018-08-12 17:02:24,301: INFO/ForkPoolWorker-2] Task tasks.add[531b5769-8919-4d59-b339-cc0e36456901] succeeded in 0.003998230000433978s: 'hello world'
[2018-08-12 17:02:25,278: INFO/MainProcess] Received task: tasks.add[88e5ffa2-794a-4ab4-b3ee-d6fccd3bb265]
[2018-08-12 17:02:25,279: WARNING/ForkPoolWorker-2] Add
[2018-08-12 17:02:25,279: WARNING/ForkPoolWorker-2] hello world
[2018-08-12 17:02:25,279: INFO/ForkPoolWorker-2] Task tasks.add[88e5ffa2-794a-4ab4-b3ee-d6fccd3bb265] succeeded in 0.0005984580020594876s: 'hello world'
|
有时任务可能由几个子任务组成, 任务之间相互调用, 这时候应使用异步回调的方式调用任务
被调用的多个任务由 |
分隔, 默认情况下使用 s(ARGS)
调用子任务, 同时返回的结果将被作为下一个任务的参数
如果不想让结果作为参数, 可以使用 si(ARGS)
或者 s(ARGS,immutable=True)
的方式进行调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@app.task
def chainTask(m):
chain = taskA.s(m) | taskB.s() | add.s()
chain()
@app.task
def taskA(m):
print('task A',m)
return m
@app.task
def taskB(m):
print('task B',m)
return m
@app.task
def add(m):
print('Add',m)
return m
|
si()
方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@app.task
def chainTask(m):
chain = taskA.si(m) | taskB.si(m) | add.si(m)
chain()
@app.task
def taskA(m):
print('task A',m)
return m
@app.task
def taskB(m):
print('task B',m)
return m
@app.task
def add(m):
print('Add',m)
return m
|
调用单个任务
1
2
3
4
5
6
7
8
9
|
@app.task
def chainTask(m):
chain = Add.s(m)
chain()
@app.task
def Add(m):
print('Add',m)
return m
|