
python 版本:3.10.1 或 3.10.2
代码:
def main(): log_listener = setup_logging(log_filename) e = asyncio.Event() cOnsumer= Consumer(e) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for sig in signals: loop.add_signal_handler( sig, lambda s=sig: asyncio.create_task(shutdown(s, loop, consumer)) ) tasks = consumer.run() try: for name, task in tasks.items(): loop.create_task(task, name=name) loop.run_forever() finally: loop.close() log_listener.stop() consumer.run() 方法会返回一个 Dict[str, Coroutine] 类型的字典。最初以为是自己的 coroutine 实现有问题导致 high CPU 。然后将 tasks 中的 coroutine 一个个移除,最后 tasks 返回空的情况下也是 100% CPU 。
用 cProfile 看了下:
Ordered by: cumulative time List reduced from 3038 to 10 due to restriction <10> ncalls tottime percall cumtime percall filename:lineno(function) 423/1 0.005 0.000 44.862 44.862 {built-in method builtins.exec} 1 0.000 0.000 44.862 44.862 myscript.py:1(<module>) 1 0.000 0.000 44.219 44.219 myscript.py:54(main) 1 0.000 0.000 44.017 44.017 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:582(run_forever) 3 0.000 0.000 44.016 14.672 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/asyncio/base_events.py:1806(_run_once) 3 0.000 0.000 43.983 14.661 /home/xxx/.pyenv/versions/3.10.1/lib/python3.10/selectors.py:452(select) 3 43.983 14.661 43.983 14.661 {method 'poll' of 'select.epoll' objects} 484/4 0.006 0.000 0.642 0.161 <frozen importlib._bootstrap>:1022(_find_and_load) 483/4 0.003 0.000 0.642 0.161 <frozen importlib._bootstrap>:987(_find_and_load_unlocked) 447/5 0.003 0.000 0.641 0.128 <frozen importlib._bootstrap>:664(_load_unlocked) 有 v 友遇到类似情况的吗?还是说我的用法有问题。
def _start_kafka_client(self) -> None: logging.debug(f"[_start_kafka_client] id(event)={id(self._event)}") i = 0 try: while not self._event.is_set(): msg_packs = self._kafka_client.poll( timeout_ms=1000, max_records=5000, ) if not msg_packs: continue # msgs is of type list containerd with ConsumerRecords tp: kafka.TopicPartition msgs: List[ConsumerRecord] for tp, msgs in msg_packs.items(): self._data_queue.put_nowait(msgs) i += len(msgs) if i % 1000 == 0: logging.info(f"count of msgs: {i}") except asyncio.QueueFull: logging.debug("data queue is full") time.sleep(1) except Exception as e: logging.error(f"error: {e}") finally: logging.debug("_start_kafka_client terminates") 1 netcan 2022-04-07 17:20:05 +08:00 tasks 为空的话,`loop.run_forever()`直接就返回了,建议上完整代码,例如 Consumer |
2 Nitroethane OP @netcan #1 已添加。就是从 kafka 接收数据,解析之后写到 es 里 |
3 makerbi 2022-04-07 18:24:09 +08:00 应该是 while 里没有 sleep 的原因吧 time.sleep(0.1)也好过完全没有 sleep |
4 jenlors 2022-04-07 18:32:26 +08:00 从 kafka 读取消息的时候 block 设置为 True ,直接阻塞循环 |
5 Nitroethane OP |
6 Richard14 2022-04-10 07:35:22 +08:00 问题太长,且缺乏最小实现,1L 代码里很多不明实现的东西,实在是不想看。而且问题给人感觉很像 AB 问题 |
7 Nitroethane OP @Richard14 #6 我寻思也没强制你看呀,不想看就别回复呀 :) |
8 lolizeppelin 2022-04-12 23:35:44 +08:00 不建议 sleep 0.1 ,sleep 0.001 都不适合。 一般来说都是通过监听事件 fd 来实现 sleep 的同时能及时响应 |