golang 分享: 60 行代码巧妙实现一个高性能无 channel 任务队列 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
Nazz
V2EX    分享创造

golang 分享: 60 行代码巧妙实现一个高性能无 channel 任务队列

  •  
  •   Nazz 2023-03-03 20:12:39 +08:00 5127 次点击
    这是一个创建于 960 天前的主题,其中的信息可能已经有所发展或是发生改变。

    话不多说, 先上测试数据, 在各种负载下均有良好表现:

    // small task const ( PoolSize = 16 BenchTimes = 1000 N = 1000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 3302 357841 ns/op 55977 B/op 2053 allocs/op BenchmarkGopool BenchmarkGopool-8 4426 319383 ns/op 20000 B/op 1173 allocs/op BenchmarkAnts BenchmarkAnts-8 3026 399899 ns/op 16047 B/op 1001 allocs/op BenchmarkNbio BenchmarkNbio-8 4314 259668 ns/op 48028 B/op 3000 allocs/op PASS 
    // medium task const ( PoolSize = 16 BenchTimes = 1000 N = 10000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 1491 808853 ns/op 57635 B/op 2008 allocs/op BenchmarkGopool BenchmarkGopool-8 1377 870051 ns/op 17266 B/op 1029 allocs/op BenchmarkAnts BenchmarkAnts-8 886 1324236 ns/op 16054 B/op 1001 allocs/op BenchmarkNbio BenchmarkNbio-8 1324 836092 ns/op 48000 B/op 3000 allocs/op PASS 
    // large task const ( PoolSize = 16 BenchTimes = 1000 N = 100000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 193 6026196 ns/op 58162 B/op 2004 allocs/op BenchmarkGopool BenchmarkGopool-8 178 6942255 ns/op 17108 B/op 1019 allocs/op BenchmarkAnts BenchmarkAnts-8 174 6300705 ns/op 16157 B/op 1002 allocs/op BenchmarkNbio BenchmarkNbio-8 176 7084957 ns/op 48071 B/op 2995 allocs/op PASS 

    测试代码 Benchmark

    代码实现

    package bench import ( "sync" ) type ( WorkerQueue struct { mu *sync.Mutex // 锁 q []Job // 任务队列 maxConcurrency int32 // 最大并发 curConcurrency int32 // 当前并发 } Job func() ) // NewWorkerQueue 创建一个任务队列 func NewWorkerQueue(maxConcurrency int32) *WorkerQueue { return &WorkerQueue{ mu: &sync.Mutex{}, maxConcurrency: maxConcurrency, curConcurrency: 0, } } // 获取一个任务 func (c *WorkerQueue) getJob(delta int32) Job { c.mu.Lock() defer c.mu.Unlock() c.curConcurrency += delta if c.curConcurrency >= c.maxConcurrency { return nil } if n := len(c.q); n == 0 { return nil } var result = c.q[0] c.q = c.q[1:] c.curConcurrency++ return result } // 递归地执行任务 func (c *WorkerQueue) do(job Job) { job() if nextJob := c.getJob(-1); nextJob != nil { go c.do(nextJob) } } // Push 追加任务, 有资源空闲的话会立即执行 func (c *WorkerQueue) Push(job Job) { c.mu.Lock() c.q = append(c.q, job) c.mu.Unlock() if item := c.getJob(0); item != nil { go c.do(item) } } 

    如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)

    29 条回复    2023-03-23 16:32:22 +08:00
    ihciah
        1
    ihciah  
       2023-03-03 22:40:45 +08:00
    说实话这代码我是真没看懂。。
    Mitt
        2
    Mitt  
       2023-03-03 22:45:51 +08:00   5
    不要 channel 反手加了个锁可还行
    Glauben
        3
    Glauben  
       2023-03-03 22:56:00 +08:00
    感觉就是普通的做法,不太理解这样的写法有什么特别的,少的时间是从功能削减上得来的吧。
    Nazz
        4
    Nazz  
    OP
       2023-03-03 23:15:51 +08:00 via Android
    @ihciah 递归
    Nazz
        5
    Nazz  
    OP
       2023-03-03 23:20:29 +08:00 via Android
    @Glauben 加上 recover 结果也差不多
    Nazz
        6
    Nazz  
    OP
       2023-03-03 23:22:29 +08:00 via Android
    @Mitt mutex 比 channel 轻量
    littlewing
        7
    littlewing  
       2023-03-04 00:15:29 +08:00
    巧妙?
    妹想到啊 妹想到啊
    hsfzxjy
        8
    hsfzxjy  
       2023-03-04 00:32:04 +08:00 via Android   1
    建议把 q 当成循环队列,复用前面空的位置,可以减少 alloc 次数
    maocat
        9
    maocat  
       2023-03-04 00:58:38 +08:00
    @hsfzxjy 既然循环队列有了,要不再加一个 sendq 和 recvq ,直接从对应的 g 上操作 \dog
    voidmnwzp
        10
    voidmnwzp  
       2023-03-04 01:26:25 +08:00 via iPhone
    这跟 go 有啥关系啊 没了 channel 任何语言都能更轻松实现啊
    Trim21
        11
    Trim21  
       2023-03-04 01:27:30 +08:00 via Android
    只看标题我以为是没 mutex 的…
    securityCoding
        12
    securityCoding  
       2023-03-04 02:30:21 +08:00 via Android
    看看 ring buffer 无锁队列实现方式。。。
    Nazz
        13
    Nazz  
    OP
       2023-03-04 04:26:31 +08:00 via Android
    @hsfzxjy 是一个优化点
    Nazz
        14
    Nazz  
    OP
       2023-03-04 04:28:30 +08:00 via Android
    @voidmnwzp 你不妨说明白点具体是什么语言
    Nazz
        15
    Nazz  
    OP
       2023-03-04 04:51:15 +08:00 via Android
    @hsfzxjy
    @securityCoding
    其实最完美的结构是 stack. 我想到一种优化方式,先 push, 然后 swap(q[0], q[n-1]), 最后 pop
    Nazz
        16
    Nazz  
    OP
       2023-03-04 05:01:22 +08:00 via Android
    @Nazz 这种方式不能保证 fifo ,并发小的话还是 heap 好点
    rrfeng
        17
    rrfeng  
       2023-03-04 07:42:04 +08:00 via Android
    benchmark 没有对比 channel 的吗?
    Nazz
        18
    Nazz  
    OP
       2023-03-04 07:56:16 +08:00 via Android
    @rrfeng 另外几种"协程池"都用了 channel
    chuanqirenwu
        19
    chuanqirenwu  
       2023-03-04 21:31:18 +08:00
    gws 的思路是自己实现一个极简的 eventloop ,而不用 go 自带的协程机制,从而没有什么额外的开销,提高性能?
    Nazz
        20
    Nazz  
    OP
       2023-03-04 23:13:29 +08:00 via Android
    @chuanqirenwu 同步模式没开额外协程,异步模式会开非常驻的协程,执行完任务就退出, 两种模式都没使用 channel.
    MindMindMax
        21
    MindMindMax  
       2023-03-05 04:16:09 +08:00
    对于常规项目价值在哪? channel 的价值又在哪?
    Nazz
        22
    Nazz  
    OP
       2023-03-05 07:43:55 +08:00 via Android
    @MindMindMax 尽量使用 mutex 替代 chan. 很多时候保证线程安全就行了,不需要多线程通信. channel 我用得最多的地方是线程同步和超时控制.
    Nazz
        23
    Nazz  
    OP
       2023-03-05 08:18:48 +08:00
    @chuanqirenwu 确实有 EventLoop. 最开始我是模仿的 JS, 因为我认为 JS WebSocket API 比 gorilla/nhooyr 这些提供的都要清晰得多. 初版只有 Sync IO, Read=>Event Handler=>Write 循环往复. 后面在此基础上加了 Async IO, AIO 模式在每个连接上有读写两个任务队列(并发度分别是 N 和 1), 就是我分享的这个实现, 它需要足够的轻量. 两种模式压测表现都比 gorilla 好得多, 原因大概是 Parser 本身的简单高效和无额外常驻协程吧, 如果有, 协程数量会增加一倍.
    chuanqirenwu
        24
    chuanqirenwu  
       2023-03-05 12:45:03 +08:00   1
    @Nazz ,虽然不怎么搞 go ,但感觉这个思路挺不错的。我看 README 的简介写的是 go websocket server ,是只支持 server 端吗? client 端没有实现?
    Nazz
        25
    Nazz  
    OP
       2023-03-05 13:16:43 +08:00 via Android
    @chuanqirenwu 刚实现的 client ,还在测试
    rockuw
        26
    rockuw  
       2023-03-13 10:21:57 +08:00
    mutex 是比 channel 轻量,但是每个 job 新建一个 goroutine 也是有代价的。一个简单的固定 goroutine 数量的实现,测试结果还稍微好一些,分配次数则明显更低

    ```
    N=10000
    goos: linux
    goarch: amd64
    pkg: muwu.com/example/workerqueue
    cpu: Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz
    BenchmarkGwsWorkerQueue-8 903 1310335 ns/op 55471 B/op 2010 allocs/op
    BenchmarkGopool-8 897 1394589 ns/op 17926 B/op 1059 allocs/op
    BenchmarkAnts-8 1203 1020211 ns/op 16046 B/op 1001 allocs/op
    BenchmarkNbio-8 956 1278696 ns/op 48017 B/op 2999 allocs/op
    BenchmarkChan-8 1004 1181569 ns/op 16016 B/op 1001 allocs/op
    ```

    ```go
    type workerQueueV1 struct {
    maxConn int
    queue chan Job
    }

    func newWorkerQueueV1(n int) *workerQueueV1 {
    wq := &workerQueueV1{
    maxConn: n,
    queue: make(chan Job, 1024),
    }
    for i := 0; i < n; i++ {
    go func() {
    for job := range wq.queue {
    job()
    }
    }()
    }
    return wq
    }

    func (wq *workerQueueV1) Push(job Job) {
    wq.queue <- job
    }
    ```
    Nazz
        27
    Nazz  
    OP
       2023-03-13 11:19:56 +08:00
    @rockuw 从你的 Benchmark 结果来看, 差距不大. GwsWorkqueue 是专门为 IO 任务设计的, 每个 WebSocket 连接上有读写两个任务队列, 它们非常轻量, 而且并行读写不会新增常驻协程. 量变产生质变, 每个连接上都增加常驻协程会使 CPU 使用率提高不少. 实际业务中并发不会很高, 可以用优先队列替代普通队列减少 allocs, 收益不高我懒得去优化了, 复用 goroutine 对于 IO 任务收益也不大.
    ClarkAbe
        28
    ClarkAbe  
       2023-03-23 14:49:55 +08:00 via Android
    感觉每次都起一个协程有点浪费...
    Nazz
        29
    Nazz  
    OP
       2023-03-23 16:32:22 +08:00
    @ClarkAbe 为了防止栈溢出. 如果做了容量限制, 可以不开新协程.
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     890 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 32ms UTC 20:15 PVG 04:15 LAX 13:15 JFK 16:15
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86