解压 zlib 数据流,困扰了一天多了没能解决 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
rekulas
V2EX    Go 编程语言

解压 zlib 数据流,困扰了一天多了没能解决

  •  
  •   rekulas
    del-xiong 2024-02-21 15:01:21 +08:00 2477 次点击
    这是一个创建于 679 天前的主题,其中的信息可能已经有所发展或是发生改变。

    对接 discord 的 zlib 数据流,它是分段传输的,只有第一个片段包含头部信息,后续片段不包含头部信息。解压后面的片段需要依赖第一个段的 header
    python 里很好实现

    deobj = zlib.decompressobj() 

    然后只要用 deobj 按顺序解压每个片段就行了
    deobj.decompress(数据流)

    改写到 go 里,发现 zlib.NewReader()只能用于第一个片段,后续片段无法解压(会报 invalid header 错误) 感觉 go 的 zlib 没有提供一个能保存上下文状态的对象,每次解压都要从头开始,导致后续不完整的 zlib 流无法解压

    reader := bytes.NewReader(bin1) zlib.NewReader(reader) reader = bytes.NewReader(bin2) zlib.NewReader(reader) // 会报错 binmerge := append(bin1, bin2...) reader = bytes.NewReader(binmerge) zlib.NewReader(reader) // 正常 

    如果把 bin1 bin2 合并起来是可以解压的,但显然这样不合理,内存和资源占用会越来越大
    所以我想只依赖 bin1 的 header 信息,然后用 bin2 的数据流来解压
    除此之外还尝试过讲 bin1 的头 N 个字节放到 bin2 前面,也没成功,似乎 header 不是单纯拷贝字节那么简单,可能涉及到其他计算
    问过 gpt ,没能解决。
    搜过 google 等各种资料,也没有解决。
    不熟悉 zlib 格式,想问问各位有没有办法实现

    2 个对应的测试数据我打包放在这里了 https://drive.google.com/file/d/1zdAbgWVWewqcovaHxRq3ZhPObPnr3m-5/view?usp=sharing

    第 1 条附言    2024-02-23 13:35:27 +08:00
    python 的测试代码
    ```
    import zlib,time


    f = open('1.bin', 'rb')
    z1 = f.read()
    f.close()
    f = open('2.bin', 'rb')
    z2 = f.read()

    deobj = zlib.decompressobj()
    print(deobj.decompress(z1).decode('utf-8'))
    time.sleep(3)
    print(deobj.decompress(z2).decode('utf-8'))

    ```
    MoYi123
        1
    MoYi123  
       2024-02-21 15:51:30 +08:00
    你可以写一个比较复杂的 reader 吧
    用一个 queue 来实现 reader,
    Read(p []byte) (n int, err error) 可以知道已经读了多少, 可以释放队列头部已使用的压缩数据
    到 zlib.NewReader 读不出数据的时候, 再往 queue 里添加新的压缩数据

    没试, 应该是可以的.
    rekulas
        2
    rekulas  
    OP
       2024-02-21 16:13:19 +08:00
    @MoYi123 目前考虑也是自己写,看了下 zlib 源码 打算复制过来魔改试试,还没成功
    boboliu
        3
    boboliu  
       2024-02-21 20:12:08 +08:00
    r := io.MultiReader(bin1, bin2, bin3)
    zlib.NewReader(r)
    rekulas
        4
    rekulas  
    OP
       2024-02-21 21:32:55 +08:00
    @boboliu 这相当于合并起来了 会引起资源问题
    lance6716
        5
    lance6716  
       2024-02-21 21:41:32 +08:00 via Android
    按照你的描述,它本身就是“一个”流,你直接把流的 reader 传过去就行啊,为啥要拆成“两个”bin1 bin2 呢
    rekulas
        6
    rekulas  
    OP
       2024-02-21 22:58:48 +08:00
    @lance6716 并不是我要拆开,而且平台给的数据就是这样的
    比如第 5 秒的时候,平台给我一个流 1, 然后继续处理任务
    第 50 秒,平台发给我流 2,通知我任务状态

    我要流式处理,肯定得这样的, js python 里面都很简单, go 居然没能成功
    boboliu
        7
    boboliu  
       2024-02-22 01:54:36 +08:00
    @rekulas 没听懂,什么叫资源问题

    就 #6 而言,chunk 当然是你自己负责整成流,优雅的方案就是开一个 buffer
    rekulas
        8
    rekulas  
    OP
       2024-02-22 10:17:25 +08:00
    @boboliu 看我上面发的啊,这种情况下你要解压就要把所有流合并到一起,处理第 200 个数据要把 1-200 全部处理一遍,你觉得合理?
    bv
        9
    bv  
       2024-02-22 10:45:11 +08:00
    是这样吗?

    input := new(bytes.Buffer)
    output, _ := zlib.NewReader(input)
    // 压缩的数据流往 input 里面写入。
    // 从 output 读取解压后的数据流。
    guonaihong
        10
    guonaihong  
       2024-02-22 10:53:44 +08:00
    把 chan 包装成一个 io.Reader, 收数据的地方直接并发 chan , 读的地方 select chan 就行。

    type myReader struct {
    c chan []byte
    }

    func (m *myReader) Read(p []byte) (n int, err error) {

    copy()
    }
    guonaihong
        11
    guonaihong  
       2024-02-22 10:57:49 +08:00
    忽略我上一个回答,直接用 io.Pipe 。然后 zlib 解决套下 io.Pipe 的 reader 对象。另外收 gzip 数据的地方并发写就行。
    https://pkg.go.dev/io#Pipe
    rekulas
        12
    rekulas  
    OP
       2024-02-22 12:48:26 +08:00
    @guonaihong pipe 我也试过没成功 ,也可能用法不对 空了我再试试
    bv
        13
    bv  
       2024-02-22 13:32:01 +08:00
    和 zlib 无关,只是流式解析没处理好而已。

    package main

    import (
    "bytes"
    "compress/zlib"
    "fmt"
    "io"
    "os"
    "sync"
    )

    //goland:noinspection GoUnhandledErrorResult
    func main() {
    bin1, _ := os.Open("1.bin")
    defer bin1.Close()
    bin2, _ := os.Open("2.bin")
    defer bin2.Close()

    input := new(bytes.Buffer)
    input.ReadFrom(bin1)
    zr, err := zlib.NewReader(input)
    if err != nil {
    fmt.Printf("zlib error: %v\n", err)
    return
    }

    wg := new(sync.WaitGroup)
    wg.Add(1)
    go func() {
    defer wg.Done()
    io.Copy(os.Stderr, zr)
    zr.Close()
    fmt.Printf("\noutput over\n")
    }()

    input.ReadFrom(bin2)

    wg.Wait()
    fmt.Printf("main over\n")
    }

    输出结果:
    {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}}{"t":null,"s":null,"op":11,"d":null}
    output over
    main over
    bv
        14
    bv  
       2024-02-22 13:48:13 +08:00
    如果是 HTTP 客户端可以改造的更简单一些,例如:

    func Get(u string) (*http.Response, error) {
    resp, err := http.Get(u)
    if err != nil {
    return nil, err
    }

    encoding := resp.Header.Get("Content-Encoding")
    if encoding == "deflate" { // 代表 body 使用了 zlib 压缩
    body := resp.Body
    rc, exx := zlib.NewReader(body)
    if exx != nil {
    _ = body.Close()
    return nil, exx
    }
    resp.Body = rc
    }

    return resp, nil
    }
    guonaihong
        15
    guonaihong  
       2024-02-22 14:28:44 +08:00
    @rekulas 有一个简单的方法验证, 如果对端传过来的 gzip 包,都缓存到 bytes.Buffer ,完毕可以解出来。那就说明你的 io.Pipe 的用法不对。
    rekulas
        16
    rekulas  
    OP
       2024-02-22 16:40:18 +08:00
    @bv 感谢测试 但是这样也是合并到一起解压的吧 并没能实现下一个包延迟处理的效果
    bv
        17
    bv  
       2024-02-22 17:56:46 +08:00
    我大概理解了你的想法:就好比分卷压缩,只要得到第一个压缩包(第一个压缩包内含有元数据),跳过任意个块包,也照样可以解压后面的任何一块压缩包。

    这应该实现不了吧,块与块之间大概率是存在依赖关系的,环环相扣,一环缺失就会导致后面数据无效。不太了解 zlib 的二进制格式,OP 要想深入研究可自行查阅资料。
    比如:ts (MPEG2-TS) 这种分块格式是经过设计的,每一块内都含有元数据,不依赖前后 ts 数据块,每一块都单独可用。

    还有:压缩包只是个容器,里面的数据才是有用的,一段数据被分块压缩后,怎么知道想要的数据被压缩分块到了哪个块区?这也是一个问题。
    rekulas
        18
    rekulas  
    OP
       2024-02-22 21:49:10 +08:00
    @bv 有一点点小区别 并不是跳过任意块包,只要按顺序 1,2,3,4....依次解压即可
    在 py 和 js 中确实是可以实现的, 只是 go 里面我不清楚怎么实现, 感觉它的 zlib 接口还比较简陋
    lesismal
        19
    lesismal  
       2024-03-16 00:12:10 +08:00
    上接: t/1024087#reply11


    package main

    import (
    "bytes"
    "compress/zlib"
    "fmt"
    "os"
    )

    func main() {
    bin1, _ := os.Open("1.bin")
    defer bin1.Close()
    bin2, _ := os.Open("2.bin")
    defer bin2.Close()

    input := new(bytes.Buffer)
    input.ReadFrom(bin1)
    zr, err := zlib.NewReader(input)
    if err != nil {
    fmt.Printf("zlib error: %v\n", err)
    return
    }

    defer zr.Close()

    buf := make([]byte, 1024)
    n1, err := zr.Read(buf)
    fmt.Println("read 1 over:", n1, err)
    fmt.Println("buf 1:", string(buf[:n1]))
    input.ReadFrom(bin2)
    n2, err := zr.Read(buf[n1:])
    fmt.Println("read 2 over:", n2, err)
    fmt.Println("buf 2:", string(buf[n1:n1+n2]))
    }


    output:

    read 1 over: 124 <nil>
    buf 1: {"t":null,"s":null,"op":10,"d":{"heartbeat_interval":41250,"_trace":["[\"gateway-prd-us-east1-c-0bwh\",{\"micros\":0.0}]"]}}
    read 2 over: 36 <nil>
    buf 2: {"t":null,"s":null,"op":11,"d":null}
    lesismal
        20
    lesismal  
       2024-03-16 00:14:05 +08:00
    BTW ,OP 自己的 python 代码里用的就是同一个 deobj = zlib.decompressobj(),go 里用了不同的 zlib reader 读取两个片段、第二个片段没有 header 、当然就出错了
    rekulas
        21
    rekulas  
    OP
       2024-03-16 08:26:45 +08:00
    @lesismal 感谢大佬帮忙, 白天有点事 晚上回来测测
    body007
        22
    body007  
       2024-03-16 09:25:49 +08:00
    测试没问题,就像你用 python 一样,只需要创建一个 zlib.NewReader ,使用 io.Pipe 就可以了。

    ```go

    package main

    import (
    "compress/zlib"
    "errors"
    "io"
    "os"
    )

    func main() {
    err := test()
    if err != nil {
    panic(err)
    }
    }

    func test() error {
    var (
    ir, iw = io.Pipe()
    dOne= errors.New("done")
    )

    go func() {
    list := []string{"1.bin", "2.bin"}
    for _, f := range list {
    fr, err := os.Open(f)
    if err != nil {
    iw.CloseWithError(err)
    return
    }

    _, err = io.Copy(iw, fr)
    if err1 := fr.Close(); err == nil {
    err = err1
    }

    if err != nil {
    iw.CloseWithError(err)
    return
    }
    }
    iw.CloseWithError(done)
    }()

    fw, err := os.Create("dst.txt")
    if err != nil {
    return err
    }
    defer fw.Close()

    zr, err := zlib.NewReader(ir)
    if err != nil {
    return err
    }

    _, err = io.Copy(fw, zr)
    if err1 := zr.Close(); err == nil {
    err = err1
    }

    if errors.Is(done, err) {
    err = nil
    }
    return err
    }

    ```
    rekulas
        23
    rekulas  
    OP
       2024-03-16 14:00:31 +08:00
    @lesismal 非常感谢 lesismal 大佬提供的方案, 测试确实可行, 我相信这也应该是最佳方案了, 也谢谢楼上其他的小伙伴们

    @body007 我也试过 pipe, 这样确实可以解密但并不符合我想实现的目标, 因为在执行 zr, err := zlib.NewReader(ir)之前, 已经把 2 个 bin 文件数据合并到一起了, 而真实情况下 1 和 2 是分别解压的, 当然也可能我对 pipe 的理解不到位, 不清楚是否还有其他通过 pipe 方式实现流式解压的
    lesismal
        24
    lesismal  
       2024-03-16 15:18:04 +08:00
    Welcome!
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2154 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 27ms UTC 16:00 PVG 00:00 LAX 08:00 JFK 11:00
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86