asyncio 中 loop.run_forever() 方法导致 100% CPU usage - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Nitroethane
V2EX    Python

asyncio 中 loop.run_forever() 方法导致 100% CPU usage

  •  
  •   Nitroethane 2022-04-07 16:51:46 +08:00 3128 次点击
    这是一个创建于 1358 天前的主题,其中的信息可能已经有所发展或是发生改变。

    python 版本:3.10.1 或 3.10.2
    代码:

    def main(): log_listener = setup_logging(log_filename) e = asyncio.Event() cOnsumer= Consumer(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for sig in signals: loop.add_signal_handler( sig, lambda s=sig: asyncio.create_task(shutdown(s, loop, consumer)) ) tasks = consumer.run() try: for name, task in tasks.items(): loop.create_task(task, name=name) loop.run_forever() finally: loop.close() log_listener.stop() 

    consumer.run() 方法会返回一个 Dict[str, Coroutine] 类型的字典。最初以为是自己的 coroutine 实现有问题导致 high CPU 。然后将 tasks 中的 coroutine 一个个移除,最后 tasks 返回空的情况下也是 100% CPU 。
    用 cProfile 看了下:

     Ordered by: cumulative time List reduced from 3038 to 10 due to restriction <10> ncalls tottime percall cumtime percall filename:lineno(function) 423/1 0.005 0.000 44.862 44.862 {built-in method builtins.exec} 1 0.000 0.000 44.862 44.862 myscript.py:1(<module>) 1 0.000 0.000 44.219 44.219 myscript.py:54(main) 1 0.000 0.000 44.017 44.017 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:582(run_forever) 3 0.000 0.000 44.016 14.672 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:1806(_run_once) 3 0.000 0.000 43.983 14.661 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/selectors.py:452(select) 3 43.983 14.661 43.983 14.661 {method 'poll' of 'select.epoll' objects} 484/4 0.006 0.000 0.642 0.161 <frozen importlib._bootstrap>:1022(_find_and_load) 483/4 0.003 0.000 0.642 0.161 <frozen importlib._bootstrap>:987(_find_and_load_unlocked) 447/5 0.003 0.000 0.641 0.128 <frozen importlib._bootstrap>:664(_load_unlocked) 

    有 v 友遇到类似情况的吗?还是说我的用法有问题。

    第 1 条附言    2022-04-07 17:31:25 +08:00
     def _start_kafka_client(self) -> None: logging.debug(f"[_start_kafka_client] id(event)={id(self._event)}") i = 0 try: while not self._event.is_set(): msg_packs = self._kafka_client.poll( timeout_ms=1000, max_records=5000, ) if not msg_packs: continue # msgs is of type list containerd with ConsumerRecords tp: kafka.TopicPartition msgs: List[ConsumerRecord] for tp, msgs in msg_packs.items(): self._data_queue.put_nowait(msgs) i += len(msgs) if i % 1000 == 0: logging.info(f"count of msgs: {i}") except asyncio.QueueFull: logging.debug("data queue is full") time.sleep(1) except Exception as e: logging.error(f"error: {e}") finally: logging.debug("_start_kafka_client terminates") 
    第 2 条附言    2022-04-07 17:31:42 +08:00
    ``` python
    async def import_data(self):
    logging.debug(f"id(event)={id(self._event)}")
    while not self._event.i_set():
    try:
    if not self._template_created:
    await asyncio.sleep(1)

    entry: List[ConsumerRecord] = self._data_queue.get_nowait()
    entries = [
    each_entry
    for each in entry
    for each_entry in self.reconstruct_entries_list(each.value)
    ]

    await self._es_client.bulk(
    index=self.get_index_name(), operatiOns=entries
    )
    except asyncio.QueueEmpty:
    await asyncio.sleep(8)
    continue
    except Exception as e:
    logging.error(f"[*] error: {e}")
    except asyncio.CancelledError:
    logging.debug("import_data is cancelled")
    break
    logging.debug("import_data terminates")

    def run(self) -> Dict[str, Coroutine]:
    return {
    "template_task": self._check_template(),
    "kafka_task": asyncio.to_thread(self._start_kafka_client),
    "es_task": self.import_data(),
    "monitor_task": self.monitor(),
    }

    async def monitor(self):
    try:
    while not self._event.is_set():
    tasks = asyncio.all_tasks()
    for task in tasks:
    print(f"[*] task {task.get_name()} done? {task.done()}")
    await asyncio.sleep(8)
    finally:
    logging.debug("monitor finished")
    ```
    8 条回复    2022-04-12 23:35:44 +08:00
    netcan
        1
    netcan  
       2022-04-07 17:20:05 +08:00
    tasks 为空的话,`loop.run_forever()`直接就返回了,建议上完整代码,例如 Consumer
    Nitroethane
        2
    Nitroethane  
    OP
       2022-04-07 17:32:18 +08:00
    @netcan #1 已添加。就是从 kafka 接收数据,解析之后写到 es 里
    makerbi
        3
    makerbi  
       2022-04-07 18:24:09 +08:00
    应该是 while 里没有 sleep 的原因吧
    time.sleep(0.1)也好过完全没有 sleep
    jenlors
        4
    jenlors  
       2022-04-07 18:32:26 +08:00
    从 kafka 读取消息的时候 block 设置为 True ,直接阻塞循环
    Nitroethane
        5
    Nitroethane  
    OP
       2022-04-07 19:41:39 +08:00
    @makerbi #3 是读 kafka 数据的那个 while 循环吗?我以为用 poll() 方法的时候设置个 1s 的 timeout 也算 sleep 。我试试加个 sleep 有没有效果。

    @jenlors #4 之前不直接阻塞的原因是不知道怎样在终止脚本运行的时候优雅地退出。今天发现在阻塞的情况下应该是可以通过捕获 CanceledError 来实现优雅退出 coroutine 。多谢回复,我试试
    Richard14
        6
    Richard14  
       2022-04-10 07:35:22 +08:00
    问题太长,且缺乏最小实现,1L 代码里很多不明实现的东西,实在是不想看。而且问题给人感觉很像 AB 问题
    Nitroethane
        7
    Nitroethane  
    OP
       2022-04-10 10:28:50 +08:00
    @Richard14 #6 我寻思也没强制你看呀,不想看就别回复呀 :)
    lolizeppelin
        8
    lolizeppelin  
       2022-04-12 23:35:44 +08:00
    不建议 sleep 0.1 ,sleep 0.001 都不适合。
    一般来说都是通过监听事件 fd 来实现 sleep 的同时能及时响应
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     892 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 21:25 PVG 05:25 LAX 13:25 JFK 16:25
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86