celery worker 多线程执行完后卡住假死 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
hanssx
V2EX    Python

celery worker 多线程执行完后卡住假死

  •  
  •   hanssx 2019-09-02 12:11:45 +08:00 7805 次点击
    这是一个创建于 2233 天前的主题,其中的信息可能已经有所发展或是发生改变。

    celery 任务代码大致如下,此任务未加 soft_time_limit 或 time_limit 其中 Session 是根据以下四行代码得来:

    SQLALCHEMY_DATABASE_URI = 'mysql://xx' some_engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=False, pool_pre_ping=True) session_factory = sessionmaker(autocommit=False, bind=some_engine) # autoflush=False, Session = scoped_session(session_factory) 

    celery 任务大致代码:

    ''' class NmapThread_(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: try: ip = self.queue.get(block=False) except SoftTimeLimitExceeded as e: raise except Exception as e: logging.error(e, exc_info=True) # 记录线程退出 break try: ns = nmap.scan(ip) # 使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库 ns.run() except SoftTimeLimitExceeded as e: raise except Exception as e: logging.error(e, exc_info=True) finally: Session.remove() self.queue.task_done() class IpScan: def __init__(self): self.queue = Queue() self.thread_count = 2 def run(self): # 此处入队列的 IP 个数非常多,大概需要多线程运行 1 周 self.queue.put('1.1.1.1') self.queue.put('2.2.2.2') self.queue.put('3.3.3.3') self.queue.put('4.4.4.4') self.queue.put('5.5.5.5') self.queue.put('6.6.6.6') self.scan_start() def scan_start(self): for i in range(self.thread_count): t = NmapThread_(self.queue) t.setDaemon(True) t.start() self.queue.join() def web_fingerprint_discern(*args, **kwargs): print('web_fingerprint_discern begin!') print(args) print(kwargs) @ce.task(name='default.test4queue', bind=True) def test4queue(self): ips = IpScan() ips.run() web_fingerprint_discern() ''' 

    现在的问题是多线程运行期间一直没有问题,直到最后多线程执行完(2019-08-31),抛出异常 break 跳出各线程之后,celry worker 就卡住假死了(2019-08-31),具体表现在:

    • celery worker 卡住假死,没有再执行接下来的 web_fingerprint_discern(),这个函数里面其实只有 3 个 print()
    • celery flower 显示 celery worker 已离线
    • rabbitMQ 显示队列中的任务还在执行之中

    然后我今天(2019-09-02)上班的时候强制 ctrl+c 之后,输出了一些日志, nC3UzR.md.png

    大家可以看到, web_fingerprint_discern()的 3 条 print 语句,第 1 条发生 2019-08-31,后 2 条发生在 2019-09-02 我 ctrl+c 的时候,其他附加表现:

    • 最后报错了 broken pipe
    • rabbitMQ 队列中的任务已经回滚到 Ready 阶段,也就是说如果重启 celery worker 任务会重新执行

    一直不能解决这个 celery 问题,由于时间原因也不方便换其他类 celery 架构,而且调试发现如果不是扫描任务就不会假死(当然测试时间肯定没有一周那么长,只有几分钟,所以如果完整模拟测试非常耗时,想寻找可能出现的问题点修复后再行测试)

    求各位大佬帮忙解决,若能解决,50 红包奉上以表谢意。

    17 条回复    2019-09-09 11:33:48 +08:00
    hanssx
        1
    hanssx  
    OP
       2019-09-02 12:26:44 +08:00
    celery 版本
    ```
    (asset) [root@VM_9_196_centos asset]# celery --verson
    4.3.0 (rhubarb)
    ```

    多线程应该是结束了,我在日志中收到了 break 线程的 while 循环之前,打印出的日志,一共 30 个日志,我也启了 30 个线程。
    ```
    [2019-08-31 22:47:31] [ERROR] - (asscan.py:38)
    Traceback (most recent call last):
    File "/root/python/asset/scan/asscan.py", line 34, in run
    task_host = self.queue.get(block=False)
    File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
    raise Empty
    _queue.Empty
    [2019-08-31 22:48:12] [ERROR] - (asscan.py:38)
    Traceback (most recent call last):
    File "/root/python/asset/scan/asscan.py", line 34, in run
    task_host = self.queue.get(block=False)
    File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
    raise Empty
    _queue.Empty
    [2019-08-31 22:48:12] [WARNING] - web_fingerprint_discern begin! (log.py:235)
    [2019-09-02 10:21:21] [WARNING] - () (log.py:235)
    [2019-09-02 10:21:21] [WARNING] - {'task_uuid': '95c3dda4-cb04-11e9-a344-52540
    ```
    neoblackcap
        2
    neoblackcap  
       2019-09-02 12:38:10 +08:00   1
    没记错的话,celery 自身实现是对 fork 之类有限制的,所以你不应该在任务里面进行类似 fork 之类的操作,线程 pthread_create 同理了。
    而且线程的支持我记得已经被 celery 自身抛弃的,所以应该是有缺陷的,建议不使用线程。

    根据我以前的做法,我一般都是将网络 IO 与逻辑处理分离。celery 对 gevent 跟进程支持都相当好,因此我会选用个 gevent 处理所有网络 IO (网络 IO,通过 IO 复用,几百万个任务都可以轻松搞定,前提是不能有任何 CPU 密集型处理)。然后通过跟进程型任务结合,组成流水线,在 celery 对应 chain 操作。那么就可以稳定地运行。

    因为 gevent 是处理网络是不堵塞的,所以你还是可以继续发任务给该 worker

    可以参考一下
    hanssx
        3
    hanssx  
    OP
       2019-09-02 13:26:56 +08:00
    @neoblackcap 谢谢 neoblackcap 师父指点,我还有几点想请教师父,
    1. 总体而言就是不使用线程而使用进程或 gevent,是吧?
    2. 之前每个线程执行的内容是使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库,主要是两个方面:一是调用 busprocess.Popen(),也就是终究还是会使用子进程;二是会有入库操作,这个应该不算是 CPU 密集型处理?怎么界定 CPU 密集型处理呢?
    3. 我这个 nmap 扫描应属于网络 IO 密集型,是不是使用 gevent 比较好?
    4. 我对 gevent 不熟悉,不知改动量大不大,师父能提供一些更改的方法吗?
    sazima
        4
    sazima  
       2019-09-02 13:27:31 +08:00
    用 task.apply_async() 可以吗, 同样是异步的.
    hanssx
        5
    hanssx  
    OP
       2019-09-02 13:33:38 +08:00
    @sazima 嗯,本身触发任务的时候,就是用的 apply_async(),应该和这个关系不大。
    neoblackcap
        6
    neoblackcap  
       2019-09-02 16:30:44 +08:00   1
    @hanssx cpu 密集型是相对的,关键是你的任务类型不能堵塞整个处理逻辑,凡是耗时长的,不需要 IO 的任务都是 IO 密集型

    看了一下你用 subprocess.Popen 去调用 nmap,你如果要改的话,请使用 gevent 的网络接口实现你 nmap 的功能,如果不会的话,此方法无解,你还是另寻他法吧。
    hanssx
        7
    hanssx  
    OP
       2019-09-02 16:47:12 +08:00
    @neoblackcap 谢谢,gevent 实现 nmap 的功能基本不可能,必须得使用 subprocess.Popen 去调用 nmap,这种情况下,我使用多进程代替多线程可以吗?你之前说进程或者 gevent 都可以。
    neoblackcap
        8
    neoblackcap  
       2019-09-02 18:25:30 +08:00 via iPhone
    @hanssx 不可以,可以的前提是你改得动网络请求的部分
    lovedebug
        9
    lovedebug  
       2019-09-02 18:39:57 +08:00 via Android
    我碰到的卡住都是程序自己的 bug,异常没抓住。
    hanssx
        10
    hanssx  
    OP
       2019-09-02 18:46:11 +08:00
    @neoblackcap 我不明白为啥不可以,我先试试用多进程,你也说了 celery 对线程支持有缺陷,网络请求的阻塞是必然的。
    xixijun
        11
    xixijun  
       2019-09-03 10:19:33 +08:00   1
    我之前也写过 celery 调度 nmap 的扫描器,答案是可以的。
    celery task 里面用 subprocess.Popen 调用 nmap
    celery 的启动用默认的 execute pool 即 prefork。
    还要注意 soft time limit 的设置,超时时需要手动 kill nmap 子进程,防止孤儿进程和僵尸进程
    hanssx
        12
    hanssx  
    OP
       2019-09-03 11:08:28 +08:00
    @xixijun 感谢 xixijun 师父的回答,请问师父你说的可以,是指使用多进程来代替多线程吗?我这边扫的是公司全网,就是扫完之后 celery worker 就卡住假死了,具体详情可查看一下问题描述。
    hanssx
        13
    hanssx  
    OP
       2019-09-03 16:15:57 +08:00
    hanssx
        14
    hanssx  
    OP
       2019-09-04 22:49:49 +08:00
    多进程可以解决这个问题,之前 @崔庆才师父说可能是 logging 死锁的问题,很有可能,待下一步确定。
    hanssx
        15
    hanssx  
    OP
       2019-09-08 10:27:04 +08:00
    已确定为 logging 死锁问题,50 块钱由崔庆才师父和 @neoblackcap 师父平分,
    @neoblackcap 师父,加我一下扣扣 9 六 14 六 2392,把支付宝账号发我即可。
    neoblackcap
        16
    neoblackcap  
       2019-09-08 12:53:43 +08:00 via iPhone   1
    @hanssx 现在我想起来了,我建议你还用 nmap 的 Python 封装库,而不是直接用 subprocess,这样就比较少一些问题,好像叫 Python-nmap,搜一下就可以了。钱就不需要了
    hanssx
        17
    hanssx  
    OP
       2019-09-09 11:33:48 +08:00
    @neoblackcap 嗯,我使用得是你说的 nmap 的 Python 封装库,源码里面使用得也是 subprocess.Popen(),额,需要时可加我扣扣,随时欢迎师父加我。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1045 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 28ms UTC 18:29 PVG 02:29 LAX 11:29 JFK 14:29
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86