关于 Python 协程的 event loop 与 future - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
sgld
V2EX    Python

关于 Python 协程的 event loop 与 future

  •  
  •   sgld 170 天前 2750 次点击
    这是一个创建于 170 天前的主题,其中的信息可能已经有所发展或是发生改变。

    最近在看这个视频

    Python AsyncIO 从入门到放弃

    前面 生成器、yield from 、native coroutine 还理解的上,到了后面两节,event loop 与 future 的时候就有点懵。 event loop 还好,轮循检查有没有待执行任务,然后执行任务

    但是结合了 future 以后,感觉脑子就转不过来了,啥时候让出权限,啥时候任务加进时间循环,啥时候恢复执行。

    单独看片段还理解,连一起有点晕了。

    想问问大佬们有没有纯文字 + 代码版的相关解释或者博文、教程之类的。

    21 条回复    2025-06-24 15:25:01 +08:00
    cyaki
        1
    cyaki  
       170 天前 via Android
    写 curio 的作者油管上有不少关于这块的
    lxy42
        2
    lxy42  
       170 天前 via Android
    500 Lines or Less
    A Web Crawler With asyncio Coroutines

    A. Jesse Jiryu Davis and Guido van Rossum

    https://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html
    thevita
        3
    thevita  
       170 天前
    分享一下我的看法,我理解这些概念不太喜欢先深入细节,有全局视角再看细节

    无栈协程的核心就是 把顺序代码变成一种状态机,不同语言的实现差异很大,但逻辑差不多

    (其实我们如果不用 coroutine, 写事件驱动应用 就是手写这个状态机)

    await 就是状态转移点,从一个 await 完成 到下一个代码路径上的 await 就是一次状态转移

    将这一小段代码封装起来 就叫 task, 这就是 事件循环执行的基本单元(不同语言实现也不一样,python 应该是依靠 生成器状态机来实现,rust ,c++ 则靠编译)

    future/awaitable 作用是管理 task 之间的依赖关系,在某个 task 的 future done 的时候,将依赖它的 task 放进就绪队列等待执行(不同实现也不一样,比如 直接通过 callback )

    所以:


    - 啥时候让出权限: 一个 task 完成的时候
    - 啥时候任务加进事件循环: 这个任务的依赖 future done 的时候 (实现可以都不一样,单实践效果一定是这样的)
    - 啥时候恢复执行: 进如 ready 队列了,就等待执行了,自于啥时候执行,就是 队列和 调度器的实现了,也都不一样

    ----


    正好前段时间看了 foundationdb ,他们自己实现了一个 叫 flow 的语言,在 < c++20 上实现了无栈协程,它的编译器会把 flow 的代码编译成 C++ 的状态机,可以清晰的看到怎么把代码转成状态机
    009694
        4
    009694  
       170 天前
    你执行 await 语句的时候,就会出让执行权。也就是说,如果你在一组代码里面没有任何 await 语句的话,这段代码是完全同步的。 当你 await 的 io 事件发生之后,执行权就回到你的下一行代码了。 future 对象就是一个预留的桩子 告诉你“你委托我执行的异步代码,等下就用这个 future 对象获取结果”。
    Trim21
        5
    Trim21  
       170 天前
    曾经尝试过搓一个 eventloop ,然后因为懒得像 uvloop 一样搓到跟 asyncio 100%兼容所以就放弃了。

    如果你不考虑网络 IO 的话,事件循环本身是非常简单的,实际上就是 event loop 上面的 call_soon 、call_at 以及 call_later 三个方法... 你可以继承一下现有的事件循环,然后在这三个方法上打 log ,然后写一段简单的 async/await 程序,就能看到你生成 future 之类的对象到底干了什么。

    python 的 Future 对象有一个 c 版本,也有一个 python 版本的,你可以直接去看源码。看看他什么时候调用我前面说的几个方法。
    songray
        7
    songray  
       170 天前
    没那么复杂,比如我们有一个 async 函数 foo ,代码执行到 await 的时候,控制权就从 foo 函数让出到别的代码块了,同时向待完成列表里插入 foo 。
    等到 foo 的 await 任务完成后,就会向 eventloop 中插入类似于 “foo 已经完成啦,你应该继续 foo 的后续操作”的 task 。
    等到 eventloop 循环到这个 task ,就会恢复上下文(也可以说是状态)到 foo ,这也就是为啥无栈协程也可以看做是一种状态机。
    希望我的解释比较明朗。
    sgld
        8
    sgld  
    OP
       170 天前
    @thevita 感谢大佬,我理解一下
    sgld
        9
    sgld  
    OP
       170 天前
    @Trim21 源码太复杂了,有涉及到很多情况,现在在看上面视频里面 up 写的简易版本的,
    sgld
        10
    sgld  
    OP
       170 天前
    用 ai 辅助修改了下代码,然后加了一些 print ,直观的了解了下执行流程

    import time
    import random
    from collections import deque
    from itertools import count
    import heapq

    count_id = count(1)

    class Future:
    def __init__(self):
    self._dOne= False
    self._result = None
    self._callbacks = []
    self._cancelled = False
    self._id = f'Future-{next(count_id)}'

    def add_done_callback(self, fn):
    if self._done:
    fn(self)
    else:
    self._callbacks.append(fn)

    def set_result(self, result):
    self._result = result
    self._dOne= True
    for cb in self._callbacks:
    cb(self)

    def __await__(self):
    if not self._done:
    print(f"Future {self._id} is not done, waiting...")
    # 这里的 self 是一个 Future 对象, 需要在调用时使用 await 关键字
    yield self
    return self._result

    class Task(Future):
    def __init__(self, coro):
    super().__init__()
    self.coro = coro
    print(f"任务初始化, 任务 ID: {self._id}")
    loop.call_soon(self.run)

    def run(self):
    try:
    result = self.coro.send(None)
    except StopIteration as e:
    """执行"""
    print(f"任务 {self._id} 执行完毕, 结果: {e.value}")
    self.set_result(e.value)
    else:
    if isinstance(result, Future):
    result.add_done_callback(self._wakeup)
    print(f"Task {self._id} is waiting for Future {result._id}")
    def _wakeup(self, future: Future):
    """
    This method is called when the future is done.
    It schedules the task to run again.
    """
    print(f"等待完成, 唤醒任务 {self._id}, 结果: {future._result}")
    loop.call_soon(self.run)

    class EventLoop:
    def __init__(self):
    self._ready = deque()
    self._scheduled = []
    self._stopped = False

    def call_soon(self, callback, *args):
    self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
    heapq.heappush(self._scheduled,
    (time.time() + delay, callback, args))

    def stop(self):
    self._stopped = True

    def create_task(self, coro):
    return Task(coro)

    def run_forever(self):
    while not self._stopped:
    self.run_once()

    def run_once(self):
    now = time.time()
    while self._scheduled and self._scheduled[0][0] <= now:
    _, cb, args = heapq.heappop(self._scheduled)
    self._ready.append((cb, args))

    num = len(self._ready)
    for _ in range(num):

    # 取出一个任务, 执行它
    cb, args = self._ready.popleft()
    print(f"----> 执行 {cb}({args}) ---->")
    cb(*args)

    async def smallrun():
    print("Start smallrun")

    # 创建一个 Future 对象
    # 代表一个将来的结果, 但是现在还不知道结果是什么
    fut = Future()
    print(f"Future {fut._id} created")
    # 功能模拟 --- 随机延迟, 模拟 IO 操作
    # IO 结束以后, 调用 fut.set_result(None)
    delay = random.random()
    loop.call_later(delay, fut.set_result, None)

    await fut
    print("End smallrun after", delay)
    return delay

    async def bigrun():
    print("Start bigrun")
    delay = await smallrun()
    print("End bigrun with", delay*10)
    return delay * 10

    async def main_task():
    print("Main task start")
    result = await bigrun()
    print("Final result:", result)

    if __name__ == "__main__":
    loop = EventLoop()
    loop.create_task(main_task())

    # 2.1 秒后停止事件循环
    loop.call_later(2.2, loop.stop)
    loop.run_forever()
    sgld
        11
    sgld  
    OP
       170 天前
    @lxy42 好的好的,我看看这个,感谢资源
    sgld
        12
    sgld  
    OP
       170 天前
    @cyaki 是哪位呀
    cyaki
        13
    cyaki  
       169 天前
    @sgld David Beazley, 在 youtube 上搜就行
    iorilu
        14
    iorilu  
       168 天前
    有本书很好 Python Concurrency with asyncio

    当然这书内容太多, 不需要都看, 我也只看了一小部分足够了

    除非是想独自开发 asyncio 相关的框架, 一般人了解下就行了
    kivmi
        15
    kivmi  
       166 天前
    @thevita CV 大神
    kivmi
        16
    kivmi  
       166 天前
    @009694 future + callback 更好理解吧
    kivmi
        17
    kivmi  
       166 天前
    @songray 正解
    UN2758
        18
    UN2758  
       122 天前
    @sgld #10 你这不加个代码缩进,直接看头痛
    pyKane
        19
    pyKane  
       112 天前
    那习惯了异步,再也回不去了。
    以前 PY 做高并发想都不敢想,
    现在轻轻松松。很多 SDK 没有异步,就自已去写。
    Neonyuyang
        20
    Neonyuyang  
       108 天前
    @coldear 打不开,老师
    Neonyuyang
        21
    Neonyuyang  
       108 天前
    @coldear 可以打开了
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5283 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 25ms UTC 08:18 PVG 16:18 LAX 01:18 JFK 04:18
    Do have faith in what you're doing.
    ubao snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86