Celery 学习笔记

Celery 是一个强大的异步任务调度框架.

概念

类似于 生产/消费模型 中的生产者和消费者, 在 celery 中也有这样的几个角色, 分别是 broker backend work 和 task

broker 是用于传输消息的中间件, 当应用程序调用异步任务的时候, 会向 broker 传递消息, 之后 worker 取出任务并执行

RabbitMQ Redis 可作为 broker

backend 存储 worker 执行任务后返回的结果和状态

RabbitMQ Redis 甚至 Databases 都可作为 backend

worker 就是 task 的工作者, 从 broker 中取出任务并执行

顾名思义, 你想执行的任务

工作流程

1
2
3
4
5
6
7
Async Task   Schedule Task
       \       /
         Broker
       /   |   \
  Worker Worker Worker
      \    |    /
        backend

初步上手

使用 celery.Celery 创建应用

1
2
3
from celery import Celery

app = Celery('tasks', broker='redis://127.0.0.1:6379/0', backend='reids://127.0.0.1:6379/0')

celery 提供了装饰器便于创建任务

1
2
3
4
@app.task
def add(x, y):
    print('hello world')
    return x + y

同目录下运行 celery -A task worker -l info -n %h.add

-A 指定应用名, -l 指定不同的 loglevel

-n 用于指定当前的节点名, 其中 %h 包含当前的域名和主机名, %n 只包含主机名, %d 只包含域名

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
In [1]: from tasks import add

In [2]: result = add.delay('2','3')

In [3]: result.status
Out[3]: 'SUCCESS'

In [4]: result.ready()
Out[4]: True

In [5]: result.get()
Out[5]: '23'

一般使用 delay(args) 触发任务, ready() 则说明任务是否执行成功

get() 获取返回结果, status 表示当前任务的状态

1
2
3
4
5
6
PENDING 待执行
STARTED 开始执行
SUCCESS 执行成功
FAILURE 执行失败
REVOKED 撤销
RETRY 重试

配置文件

创建 config.py

1
2
3
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'

CELERY_RESULT_BACKEND 指定 backend

BROKER_URL 指定 broker

TIMEZONE 指定当前时区

config.py 中导入配置

1
2
app = Celery()
app.config_from_object('config')

队列路由

默认情况下所有任务的信息都保存在 celery 队列中, 很容易造成阻塞

通过创建不同的队列, 使用路由将不同的任务的信息存储到指定到队列中

创建三个任务 taskA taskB add

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@app.task
def taskA(m):
	print('taskA',m)
	return m

@app.task
def taskB(m):
	print('taskB',m)
	return m

@app.task
def add(m):
	print('add',m)
	return m

编辑 config.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from kombu import Exchange,Queue

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'

CELERY_QUEUES = (
	Queue('default',Exchange('default'),routing_key='default'),
	Queue('taskA',Exchange('taskA'),routing_key='taskA'),
	Queue('taskB',Exchange('taskB'),routing_key='taskB'),
	Queue('Add',Exchange('Add'),routing_key='Add')
	)

CELERY_ROUTES = {
	'tasks.taskA':{'queue':'taskA','routing_key':'taskA'},
	'tasks.taskB':{'queue':'taskB','routing_key':'taskB'},
	'tasks.add':{'queue':'Add','routing_key':'Add'}
}

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES 中添加 Queue 对象创建队列

Queue(QUEUE_NAME, EXCHANGE, ROUTING_KEY)

Exchange 通过 Routing_Key 把信息路由到不同的 Queue

通过在 CELERY_ROUTES 中建立对应的映射关系指定任务使用的队列

OBJECT.TASK_NAME: {'queue':QUEUE_NAME,'routeing_key':ROUTING_KEY}

最后三行指定默认的存储队列, 当然也可以不写, 这样默认队列就为 celery

依次开启三个窗口, 执行以下命令

1
2
3
celery -A tasks worker -l info -Q taskA -n %h.taskA
celery -A tasks worker -l info -Q taskB -n %h.taskB
celery -A tasks worker -l info -Q add -n %h.add

使用 -Q 参数指定队列, 主机仅接受当前队列的任务, 这样不同队列的任务就会被分发到其它节点执行

定时任务

celery 支持定时任务和计划任务, 通过 beat 节点向 worker 发送任务

编辑 config.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from celery.schedules import crontab
from datetime import timedelta

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
BROKER_URL = 'redis://127.0.0.1:6379/0'
TIMEZONE = 'Asia/Shanghai'

CELERYBEAT_SCHEDULE = {
	'schedule_task': {
		'task': 'tasks.add',
		'schedule': timedelta(seconds=5),
		'args': ('hello world',)
	},
}

CELERYBEAT_SCHEDULE 中添加定时任务

SCHEDULE_TASK_NAME: {'task': TASK_NAME, 'schedule': TIME, 'args': ARGS}

schedule 可为间隔秒数 timedelta(secounds=5)crontab(hour=7, minute=30, day_of_week=1)

依次启动节点

1
2
celery -A tasks beat -l info
celery -A tasks worker -l info

beat

1
2
3
[2018-08-12 17:02:23,224: INFO/MainProcess] beat: Starting...
[2018-08-12 17:02:24,283: INFO/MainProcess] Scheduler: Sending due task schedule_task (tasks.add)
[2018-08-12 17:02:25,276: INFO/MainProcess] Scheduler: Sending due task schedule_task (tasks.add)

worker

1
2
3
4
5
6
7
8
[2018-08-12 17:02:24,296: INFO/MainProcess] Received task: tasks.add[531b5769-8919-4d59-b339-cc0e36456901]  
[2018-08-12 17:02:24,298: WARNING/ForkPoolWorker-2] Add
[2018-08-12 17:02:24,298: WARNING/ForkPoolWorker-2] hello world
[2018-08-12 17:02:24,301: INFO/ForkPoolWorker-2] Task tasks.add[531b5769-8919-4d59-b339-cc0e36456901] succeeded in 0.003998230000433978s: 'hello world'
[2018-08-12 17:02:25,278: INFO/MainProcess] Received task: tasks.add[88e5ffa2-794a-4ab4-b3ee-d6fccd3bb265]  
[2018-08-12 17:02:25,279: WARNING/ForkPoolWorker-2] Add
[2018-08-12 17:02:25,279: WARNING/ForkPoolWorker-2] hello world
[2018-08-12 17:02:25,279: INFO/ForkPoolWorker-2] Task tasks.add[88e5ffa2-794a-4ab4-b3ee-d6fccd3bb265] succeeded in 0.0005984580020594876s: 'hello world'

链式任务

有时任务可能由几个子任务组成, 任务之间相互调用, 这时候应使用异步回调的方式调用任务

被调用的多个任务由 | 分隔, 默认情况下使用 s(ARGS) 调用子任务, 同时返回的结果将被作为下一个任务的参数

如果不想让结果作为参数, 可以使用 si(ARGS) 或者 s(ARGS,immutable=True) 的方式进行调用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@app.task
def chainTask(m):
	chain = taskA.s(m) | taskB.s() | add.s()
	chain()

@app.task
def taskA(m):
	print('task A',m)
	return m

@app.task
def taskB(m):
	print('task B',m)
	return m

@app.task
def add(m):
	print('Add',m)
	return m

si() 方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
@app.task
def chainTask(m):
	chain = taskA.si(m) | taskB.si(m) | add.si(m)
	chain()

@app.task
def taskA(m):
	print('task A',m)
	return m

@app.task
def taskB(m):
	print('task B',m)
	return m

@app.task
def add(m):
	print('Add',m)
	return m

调用单个任务

1
2
3
4
5
6
7
8
9
@app.task
def chainTask(m):
	chain = Add.s(m)
	chain()

@app.task
def Add(m):
	print('Add',m)
	return m