写了个 Go 库解决 LLM 流式输出断线重连的问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
Junian

写了个 Go 库解决 LLM 流式输出断线重连的问题

  •  
  •   Junian 10 天前 1574 次点击
    最近在做一个项目,后端 Go ,前端 SSE 推流 LLM 的输出。遇到一个很烦的问题:用户刷新页面或者网络抖一下,流就断了,但后端还在跑,token 照烧不误。

    更麻烦的是我们的 LLM worker 和 HTTP handler 不在同一个实例上,负载均衡一转发,重连过来的请求根本找不到原来那个流。

    JS/TS 那边有 Vercel 的 resumable-stream 可以用,但 Go 这边翻了一圈啥也没有,就自己撸了一个:

    https://github.com/gtoxlili/streamhub

    思路不复杂:
    - Redis Streams 存 chunk ,断线重连的订阅者先 replay 历史再接实时数据
    - Redis Pub/Sub 传 cancel 信号,用户在 A 节点点停止,B 节点上的生成就能收到
    - 每个 producer 有个 generation ID 做 fencing token ,防止旧 producer 写脏数据
    - 同一个 session 只允许一个 producer 注册,不会重复调 LLM

    代码大概长这样:

    ```go
    // 生产端
    stream, created, err := hub.Register("chat:123", cancelFunc)
    if !created {
    return // 已经有人在跑了
    }
    defer stream.Close()
    stream.Publish("hello")

    // 消费端(任意实例)
    chunks, unsub := hub.Get("chat:123").Subscribe(128)
    defer unsub()
    for chunk := range chunks {
    // 先 replay 再 live
    fmt.Fprint(w, chunk)
    }
    ```

    目前还比较早期,API 可能还会改。做类似场景的同学可以看看,有想法欢迎提 issue 。
    11 条回复    2026-04-17 16:04:46 +08:00
    bv
    &nsp;   1
    bv  
       9 天前
    给你提示一点:req.Context()
    raycheung
        2
    raycheung  
       9 天前
    DefoliationM
        3
    DefoliationM  
       9 天前 via Android   1
    哥们基础不牢呀,用 context 就行了。
    cryptovae
        4
    cryptovae  
       9 天前   1
    为了解决 A 又引入了 B ,我真是服了

    简单事简单做,LLM 流式输出,管它前端断不断,最终结果肯定要入库,等前端用户刷新的时候自己去轮训这个 message 结果就行了
    yoshiyuki
        5
    yoshiyuki  
       9 天前
    @cryptovae 用户不会像你想象的用的,他断了就会去新开会话,当前这个 token 就算白烧了
    boolean1135
        6
    boolean1135  
       9 天前 via Android
    mark 一下,到时候回家学习学习 AI 开发
    cryptovae
        7
    cryptovae  
       9 天前
    @yoshiyuki 你得的回答完全不切合主题,题主要的是异常断开后,能继续恢复,你给我说浪费 token 的事,在没有用户主动点击取消的情况下,用户只会觉得你凭什么不给我回答了,比如 deepseek ,chatgpt 都会在刷新页面自动接着回答
    Charlie17Li
        8
    Charlie17Li  
       9 天前
    @yoshiyuki 如果是我我会刷新一下,不想多打一点字
    Junian
        9
    Junian  
    OP
       9 天前
    谢谢各位反馈,统一回一下。

    @bv @DefoliationM req.Context() 确实能感知客户端断开,这个我清楚。但我要解决的不是"知道客户端走了",是断开之后的事:

    1. 重连后怎么把之前的 chunk 补回去( replay )
    2. 生产者和消费者不在同一个进程甚至不在同一台机器
    3. 用户在 A 节点点取消,B 节点的生成要能停

    单靠 context 搞不定这几个。streamhub 底下该用 context 的地方也在用,上面多了一层跨实例的状态管理。

    @raycheung durable-streams 是一个协议规范,定义 HTTP 层 offset-based streaming 的标准。streamhub 是直接用 Redis Streams 做存储的 Go 库,层级不太一样,但确实解决的问题有重叠。

    @cryptovae 最终结果入库+轮询在生成完之后确实没问题。流式输出的点在于生成过程中就要实时推,不是攒完再返回。刷新后自动接上( deepseek/chatgpt 就是这么做的)才是这个库在解决的场景。
    Junian
        10
    Junian  
    OP
       9 天前
    @raycheung 哦刚仔细看了下 durable-streams ,确实比我之前说的要大不少,不只是协议规范,人家有完整的多语言客户端和 server 实现,还集成了 Vercel AI SDK

    不过定位还是不太一样。durable-streams 是一个通用的持久化流协议,偏基础设施层,你需要跑他的 server 。streamhub 就是一个库,直接用你现有的 Redis 就行,不引入新的依赖,适合已经有 Redis 的项目快速接入
    voidmnwzp
        11
    voidmnwzp  
       7 天前 via iPhone
    这不是典型的简单问题复杂化吗 httphanlder 传个 context 给 llmhandler ,如果 http 这边有 err 了,直接 cancel 掉不就完了。。。
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     898 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 37ms UTC 22:10 PVG 06:10 LAX 15:10 JFK 18:10
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86