Please enable Javascript to view the contents

初试Celery

 ·  ☕  2 分钟

Celery 的架构

image

  • Celery Beat:任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
  • Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
  • Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
  • Producer:调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
  • Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。

一个比较“合适”的方案

  • 选择RabbitMQ作为消息代理 pip install librabbitmq
  • 选择Msgpack做序列化 pip install msgpack
  • 选择Redis做结果存储 pip install redis

参考文章

🍳 烹饪 Celery

tree proj项目结构:

proj
├── __init__.py
├── app.py
├── celeryconfig.py
└── tasks.py

app.py:

1
2
3
4
5
6
7
from celery import Celery

app = Celery('proj', include=['proj.tasks'])
app.config_from_object('proj.celeryconfig')

if __name__ == '__main__':
    app.start()

tasks.py:

1
2
3
4
5
from proj.app import app

@app.task
def add(x, y):
    return x + y

celertconifg.py

1
2
3
4
5
6
BROKER_URL = 'redis://localhost:6379/0' # 本地我就使用Redis作为消息代理了
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' # 把任务结果存在了Redis
CELERY_TASK_SERIALIZER = 'msgpack' # 任务序列化和反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json' # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的内容类型

Run worker

1
2
cd ../
celery --app=proj.app.app worker -l info

进入 Python shell 发送任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
>>> from proj.tasks import add
>>> r = add.delay(1, 3)
>>> r
<AsyncResult: f4379dd4-c8eb-4854-971b-e8770c4f604f>
>>> r.result
4
>>> r.status
'SUCCESS'
>>> r.successful()
True
>>> r.backend
<celery.backends.redis.RedisBackend object at 0x10fff1e80>
>>> r.id
'f4379dd4-c8eb-4854-971b-e8770c4f604f'
>>> from celery.result import AsyncResult
>>> AsyncResult(r.id).status
'SUCCESS'
>>> AsyncResult(r.id).get()
4

任务配置 🎛

cat proj/tasks.py:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import time
from celery.utils.log import get_task_logger
...
logger = get_task_logger(__name__)
...
@app.task(bind=True)
def div(self, x, y):
    logger.info(
        'Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(self.request))
    time.sleep(60)
    try:
        result = x / y
    except ZeroDivisionError as e:
        raise self.retry(exc=e, countdown=5, max_retries=3)
    return result
  • 任务绑定bind=True:将div绑定为方法,通过self获取任务的上下文。
  • 任务日志logger = get_task_logger(__name__) 获取任务日志
  • 任务重试 self.retry(exc=e, countdown=5, max_retries=3):每 5 秒就会重试一次,一共重试 3 次(默认重复 3 次),然后抛出异常。
目录