异步任务神器 Celery 简明笔记 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
iMmatrix
V2EX    Python

异步任务神器 Celery 简明笔记

  •  9
     
  •   iMmatrix 2016-12-13 09:58:37 +08:00 9363 次点击
    这是一个创建于 3227 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Celery

    在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,而发邮件是个 IO 阻塞式任务,如果直接把它放到应用当中,就需要等邮件发出去之后才能进行下一步操作,此时用户只能等待再等待。更好的方式是在业务逻辑中触发一个发邮件的异步任务,而主程序可以继续往下运行。

    Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务( async task )和定时任务( crontab )。它的架构组成如下图:

    Celery_framework

    可以看到, Celery 主要包含以下几个模块:

    • 任务模块

      包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列

    • 消息中间件 Broker

      Broker ,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。 Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

    • 任务执行单元 Worker

      Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它

    • 任务结果存储 Backend

      Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。

    异步任务

    使用 Celery 实现异步任务主要包含三个步骤:

    1. 创建一个 Celery 实例
    2. 启动 Celery Worker
    3. 应用程序调用异步任务

    快速入门

    为了简单起见,对于 Broker 和 Backend ,这里都使用 redis 。在运行下面的例子之前,请确保 redis 已正确安装,并开启 redis 服务,当然, celery 也是要安装的。可以使用下面的命令来安装 celery 及相关依赖:

    $ pip install 'celery[redis]' 

    创建 Celery 实例

    将下面的代码保存为文件 tasks.py

    # -*- coding: utf-8 -*- import time from celery import Celery broker = 'redis://127.0.0.1:6379' backend = 'redis://127.0.0.1:6379/0' app = Celery('my_task', broker=broker, backend=backend) @app.task def add(x, y): time.sleep(5) # 模拟耗时操作 return x + y 

    上面的代码做了几件事:

    • 创建了一个 Celery 实例 app ,名称为 my_task
    • 指定消息中间件用 redis , URL 为 redis://127.0.0.1:6379
    • 指定存储用 redis , URL 为 redis://127.0.0.1:6379/0
    • 创建了一个 Celery 任务 add,当函数被 @app.task 装饰后,就成为可被 Celery 调度的任务;

    启动 Celery Worker

    在当前目录,使用如下方式启动 Celery Worker :

    $ celery worker -A tasks --loglevel=info 

    其中:

    • 参数 -A 指定了 Celery 实例的位置,本例是在 tasks.py 中, Celery 会自动在该文件中寻找 Celery 对象实例,当然,我们也可以自己指定,在本例,使用 -A tasks.app
    • 参数 --loglevel 指定了日志级别,默认为 warning ,也可以使用 -l info 来表示;

    在生产环境中,我们通常会使用 Supervisor 来控制 Celery Worker 进程。

    启动成功后,控制台会显示如下输出:

    celery

    调用任务

    现在,我们可以在应用程序中使用 delay()apply_async() 方法来调用任务。

    在当前目录打开 Python 控制台,输入以下代码:

    >>> from tasks import add >>> add.delay(2, 8) <AsyncResult: 2272ddce-8be5-493f-b5ff-35a0d9fe600f> 

    在上面,我们从 tasks.py 文件中导入了 add 任务对象,然后使用 delay() 方法将任务发送到消息中间件( Broker ), Celery Worker 进程监控到该任务后,就会进行执行。我们将窗口切换到 Worker 的启动窗口,会看到多了两条日志:

    [2016-12-10 12:00:50,376: INFO/MainProcess] Received task: tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f] [2016-12-10 12:00:55,385: INFO/PoolWorker-4] Task tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f] succeeded in 5.00642602402s: 10 

    这说明任务已经被调度并执行成功。

    另外,我们如果想获取执行后的结果,可以这样做:

    >>> result = add.delay(2, 6) >>> result.ready() # 使用 ready() 判断任务是否执行完毕 False >>> result.ready() False >>> result.ready() True >>> result.get() # 使用 get() 获取任务结果 8 

    在上面,我们是在 Python 的环境中调用任务。事实上,我们通常在应用程序中调用任务。比如,将下面的代码保存为 client.py:

    # -*- coding: utf-8 -*- from tasks import add # 异步任务 add.delay(2, 8) print 'hello world' 

    运行命令 $ python client.py,可以看到,虽然任务函数 add 需要等待 5 秒才返回执行结果,但由于它是一个异步任务,不会阻塞当前的主程序,因此主程序会往下执行 print 语句,打印出结果。

    使用配置

    在上面的例子中,我们直接把 Broker 和 Backend 的配置写在了程序当中,更好的做法是将配置项统一写入到一个配置文件中,通常我们将该文件命名为 celeryconfig.py。 Celery 的配置比较多,可以在官方文档查询每个配置项的含义。

    下面,我们再看一个例子。项目结构下:

    celery_demo # 项目根目录 ├── celery_app # 存放 celery 相关文件 │ ├── __init__.py │ ├── celeryconfig.py # 配置文件 │ ├── task1.py # 任务文件 1 │ └── task2.py # 任务文件 2 └── client.py # 应用程序 

    __init__.py 代码如下:

    # -*- coding: utf-8 -*- from celery import Celery app = Celery('demo') # 创建 Celery 实例 app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块 

    celeryconfig.py 代码如下:

    BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend CELERY_TIMEZOnE='Asia/Shanghai' # 指定时区,默认是 UTC # CELERY_TIMEZOnE='UTC' CELERY_IMPORTS = ( # 指定导入的任务模块 'celery_app.task1', 'celery_app.task2' ) 

    task1.py 代码如下:

    import time from celery_app import app @app.task def add(x, y): time.sleep(2) return x + y 

    task2.py 代码如下:

    import time from celery_app import app @app.task def multiply(x, y): time.sleep(2) return x * y 

    client.py 代码如下:

    # -*- coding: utf-8 -*- from celery_app import task1 from celery_app import task2 task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8) task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7) print 'hello world' 

    现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

    celery_demo $ celery -A celery_app worker --loglevel=info 

    接着,运行 $ python client.py,它会发送两个异步任务到 Broker ,在 Worker 的窗口我们可以看到如下输出:

    [2016-12-10 13:51:58,939: INFO/MainProcess] Received task: celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa] [2016-12-10 13:51:58,941: INFO/MainProcess] Received task: celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a] [2016-12-10 13:52:00,948: INFO/PoolWorker-3] Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa] succeeded in 2.00600231002s: 10 [2016-12-10 13:52:00,949: INFO/PoolWorker-4] Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a] succeeded in 2.00601326401s: 21 

    delay 和 apply_async

    在前面的例子中,我们使用 delay()apply_async() 方法来调用任务。事实上,delay 方法封装了 apply_async,如下:

    def delay(self, *partial_args, **partial_kwargs): """Shortcut to :meth:`apply_async` using star arguments.""" return self.apply_async(partial_args, partial_kwargs) 

    也就是说,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的参数,它的一般形式如下:

    apply_async(args=(), kwargs={}, route_name=None, **options) 

    apply_async 常用的参数如下:

    • countdown :指定多少秒后执行任务
    task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务 
    • eta (estimated time of arrival):指定任务被调度的具体时间,参数类型是 datetime
    from datetime import datetime, timedelta # 当前 UTC 时间再加 10 秒后执行任务 task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(secOnds=10)) 
    • expires :任务过期时间,参数类型可以是 int ,也可以是 datetime
    task1.multiply.apply_async(args=[3, 7], expires=10) # 10 秒后过期 

    更多的参数列表可以在官方文档中查看。

    定时任务

    Celery 除了可以执行异步任务,也支持执行周期性任务( Periodic Tasks ),或者说定时任务。 Celery Beat 进程通过读取配置文件的内容,周期性地将定时任务发往任务队列。

    让我们看看例子,项目结构如下:

    celery_demo # 项目根目录 ├── celery_app # 存放 celery 相关文件 ├── __init__.py ├── celeryconfig.py # 配置文件 ├── task1.py # 任务文件 └── task2.py # 任务文件 

    __init__.py 代码如下:

    # -*- coding: utf-8 -*- from celery import Celery app = Celery('demo') app.config_from_object('celery_app.celeryconfig') 

    celeryconfig.py 代码如下:

    # -*- coding: utf-8 -*- from datetime import timedelta from celery.schedules import crontab # Broker and Backend BROKER_URL = 'redis://127.0.0.1:6379' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # Timezone CELERY_TIMEZOnE='Asia/Shanghai' # 指定时区,不指定默认为 'UTC' # CELERY_TIMEZOnE='UTC' # import CELERY_IMPORTS = ( 'celery_app.task1', 'celery_app.task2' ) # schedules CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'celery_app.task1.add', 'schedule': timedelta(secOnds=30), # 每 30 秒执行一次 'args': (5, 8) # 任务函数参数 }, 'multiply-at-some-time': { 'task': 'celery_app.task2.multiply', 'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次 'args': (3, 7) # 任务函数参数 } } 

    task1.py 代码如下:

    import time from celery_app import app @app.task def add(x, y): time.sleep(2) return x + y 

    task2.py 代码如下:

    import time from celery_app import app @app.task def multiply(x, y): time.sleep(2) return x * y 

    现在,让我们启动 Celery Worker 进程,在项目的根目录下执行下面命令:

    celery_demo $ celery -A celery_app worker --loglevel=info 

    接着,启动 Celery Beat 进程,定时将任务发送到 Broker ,在项目根目录下执行下面命令:

    celery_demo $ celery beat -A celery_app celery beat v4.0.1 (latentcall) is starting. __ - ... __ - _ LocalTime -> 2016-12-11 09:48:16 Configuration -> . broker -> redis://127.0.0.1:6379// . loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@%WARNING . maxinterval -> 5.00 minutes (300s) 

    之后,在 Worker 窗口我们可以看到,任务 task1 每 30 秒执行一次,而 task2 每天早上 9 点 50 分执行一次。

    在上面,我们用两个命令启动了 Worker 进程和 Beat 进程,我们也可以将它们放在一个命令中:

    $ celery -B -A celery_app worker --loglevel=info 

    Celery 周期性任务也有多个配置项,可参考官方文档

    参考资料


    本文由 funhacks 发表于个人博客,采用 Creative Commons BY-NC-ND 4.0 (自由转载-保持署名-非商用-禁止演绎)协议发布。

    非商业转载请注明作者及出处。商业转载请联系作者本人。

    本文标题为: 异步任务神器 Celery 简明笔记

    本文链接为: https://funhacks.net/2016/12/13/celery/

    29 条回复    2021-09-07 16:47:29 +08:00
    stamaimer
        1
    stamaimer  
       2016-12-13 10:05:32 +08:00 via iPhone   4
    大家都会看文档谢谢。
    chuyik
        2
    chuyik  
       2016-12-13 10:06:07 +08:00
    不用 Twisted?
    knightdf
        3
    knightdf  
       2016-12-13 10:17:08 +08:00
    @chuyik Twisted 跟这个搭不上边
    wwqgtxx
        4
    wwqgtxx  
       2016-12-13 10:31:32 +08:00 via iPhone   10
    @stamaimer 大家都会说废话谢谢
    fxxkgw
        5
    fxxkgw  
       2016-12-13 10:35:50 +08:00
    celery 文档貌似还没汉化 当时用的时候发现的。。。
    尝试了下 无奈英文不好 放弃了
    saggit
        6
    saggit  
       2016-12-13 10:36:25 +08:00
    分享有风险。
    JhZ7z587cYROBgVQ
        7
    JhZ7z587cYROBgVQ  
       2016-12-13 10:37:46 +08:00
    楼主分享一下使用经验的蛮好的啊,支持
    thinker3
        8
    thinker3  
       2016-12-13 11:38:05 +08:00
    谢谢
    playniuniu
        9
    playniuniu  
       2016-12-13 11:39:28 +08:00
    支持楼主,虽然有官方文档,但楼主这篇写的不错
    dofine
        10
    dofine  
       2016-12-13 11:49:55 +08:00
    差点把楼主的域名看成 f**k 。。(捂脸
    echovaio
        11
    echovaio  
       2016-12-13 12:07:08 +08:00 via Android
    写得挺好的
    dreampuf
        12
    dreampuf  
       2016-12-13 13:51:10 +08:00
    支持自己写 Tutorials
    recall704
        13
    recall704  
       2016-12-13 14:00:27 +08:00
    先收藏一个,虽然已经会用了。
    neoblackcap
        14
    neoblackcap  
       2016-12-13 14:14:52 +08:00
    Celery Beat 不能跟 gevent 类型额 worker 同时使用
    neoblackcap
        15
    neoblackcap  
       2016-12-13 14:18:31 +08:00   1
    broker 请尽量使用 rabbitmq , redis 有丢任务的风险, rabbitmq 跟 redis 虽然都是官方支持,但是显然是 rabbitmq 的支持更好,而且 amqp 协议在协议层使得 Broker 可以感知消息消费的情况。当你们上 celery 的时候,请尽量使用 rabbitmq ,那会少很多坑,而且各种监控也比较完善
    dirk41
        16
    dirk41  
       2016-12-13 16:12:08 +08:00
    celery_demo $ celery -A celery_app worker --loglevel=info
    执行这个的时候, "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
    ImportError: No module named celery
    NaVient
        17
    NaVient  
       2016-12-13 17:35:08 +08:00
    收藏一下 写得挺好的
    julyclyde
        18
    julyclyde  
       2016-12-13 20:35:37 +08:00
    @neoblackcap beat 和 gevent 一起用了很多年了,我还没发现有啥问题
    iMmatrix
        19
    iMmatrix  
    OP
       2016-12-13 20:42:47 +08:00
    @neoblackcap ,是的。
    iMmatrix
        20
    iMmatrix  
    OP
       2016-12-13 20:43:08 +08:00
    @dirk41 ,你还没装 celery 吧。
    pypy
        21
    pypy  
       2016-12-13 21:48:28 +08:00
    感谢分享。受益匪浅。
    neoblackcap
        22
    neoblackcap  
       2016-12-13 22:06:52 +08:00
    @julyclyde 是加-B 吗?
    julyclyde
        23
    julyclyde  
       2016-12-13 22:33:31 +08:00
    @neoblackcap 我这里日常是单独一个 beat ;-B 是 beat 和 worker 混合运行的,我只用过几次,并没有注意到有异常的日志哦
    linxzh1989
        24
    linxzh1989  
       2016-12-13 22:55:54 +08:00
    非常感谢楼主的分享。不要脸的 收下了
    Landarky
        25
    Landarky  
       2016-12-13 23:17:54 +08:00
    这个分享不错
    unsuey
        26
    unsuey  
       2016-12-14 01:13:06 +08:00 via iPhone
    感谢分享
    jyf
        27
    jyf  
       2016-12-14 11:12:44 +08:00
    beanstalkd 也不错
    xavierskip
        28
    xavierskip  
       2016-12-14 14:11:16 +08:00
    不是不提倡全文装载的吗?自己的 blog 那就更没必要了。
    fxtaoo
        29
    fxtaoo  
       2021-09-07 16:47:29 +08:00
    结合例子,通俗易懂,赞一个
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2997 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 13:45 PVG 21:45 LAX 06:45 JFK 09:45
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86