话不多说, 先上测试数据, 在各种负载下均有良好表现:
// small task const ( PoolSize = 16 BenchTimes = 1000 N = 1000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 3302 357841 ns/op 55977 B/op 2053 allocs/op BenchmarkGopool BenchmarkGopool-8 4426 319383 ns/op 20000 B/op 1173 allocs/op BenchmarkAnts BenchmarkAnts-8 3026 399899 ns/op 16047 B/op 1001 allocs/op BenchmarkNbio BenchmarkNbio-8 4314 259668 ns/op 48028 B/op 3000 allocs/op PASS
// medium task const ( PoolSize = 16 BenchTimes = 1000 N = 10000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 1491 808853 ns/op 57635 B/op 2008 allocs/op BenchmarkGopool BenchmarkGopool-8 1377 870051 ns/op 17266 B/op 1029 allocs/op BenchmarkAnts BenchmarkAnts-8 886 1324236 ns/op 16054 B/op 1001 allocs/op BenchmarkNbio BenchmarkNbio-8 1324 836092 ns/op 48000 B/op 3000 allocs/op PASS
// large task const ( PoolSize = 16 BenchTimes = 1000 N = 100000 ) goos: darwin goarch: arm64 pkg: bench BenchmarkGwsWorkerQueue BenchmarkGwsWorkerQueue-8 193 6026196 ns/op 58162 B/op 2004 allocs/op BenchmarkGopool BenchmarkGopool-8 178 6942255 ns/op 17108 B/op 1019 allocs/op BenchmarkAnts BenchmarkAnts-8 174 6300705 ns/op 16157 B/op 1002 allocs/op BenchmarkNbio BenchmarkNbio-8 176 7084957 ns/op 48071 B/op 2995 allocs/op PASS
package bench import ( "sync" ) type ( WorkerQueue struct { mu *sync.Mutex // 锁 q []Job // 任务队列 maxConcurrency int32 // 最大并发 curConcurrency int32 // 当前并发 } Job func() ) // NewWorkerQueue 创建一个任务队列 func NewWorkerQueue(maxConcurrency int32) *WorkerQueue { return &WorkerQueue{ mu: &sync.Mutex{}, maxConcurrency: maxConcurrency, curConcurrency: 0, } } // 获取一个任务 func (c *WorkerQueue) getJob(delta int32) Job { c.mu.Lock() defer c.mu.Unlock() c.curConcurrency += delta if c.curConcurrency >= c.maxConcurrency { return nil } if n := len(c.q); n == 0 { return nil } var result = c.q[0] c.q = c.q[1:] c.curConcurrency++ return result } // 递归地执行任务 func (c *WorkerQueue) do(job Job) { job() if nextJob := c.getJob(-1); nextJob != nil { go c.do(nextJob) } } // Push 追加任务, 有资源空闲的话会立即执行 func (c *WorkerQueue) Push(job Job) { c.mu.Lock() c.q = append(c.q, job) c.mu.Unlock() if item := c.getJob(0); item != nil { go c.do(item) } }
如果觉得对你有帮助, 麻烦给 gws 点个赞吧:)
![]() | 1 ihciah 2023-03-03 22:40:45 +08:00 说实话这代码我是真没看懂。。 |
2 Mitt 2023-03-03 22:45:51 +08:00 ![]() 不要 channel 反手加了个锁可还行 |
3 Glauben 2023-03-03 22:56:00 +08:00 感觉就是普通的做法,不太理解这样的写法有什么特别的,少的时间是从功能削减上得来的吧。 |
7 littlewing 2023-03-04 00:15:29 +08:00 巧妙? 妹想到啊 妹想到啊 |
![]() | 8 hsfzxjy 2023-03-04 00:32:04 +08:00 via Android ![]() 建议把 q 当成循环队列,复用前面空的位置,可以减少 alloc 次数 |
![]() | 10 voidmnwzp 2023-03-04 01:26:25 +08:00 via iPhone 这跟 go 有啥关系啊 没了 channel 任何语言都能更轻松实现啊 |
![]() | 11 Trim21 2023-03-04 01:27:30 +08:00 via Android 只看标题我以为是没 mutex 的… |
12 securityCoding 2023-03-04 02:30:21 +08:00 via Android 看看 ring buffer 无锁队列实现方式。。。 |
![]() | 15 Nazz OP |
![]() | 17 rrfeng 2023-03-04 07:42:04 +08:00 via Android benchmark 没有对比 channel 的吗? |
![]() | 19 chuanqirenwu 2023-03-04 21:31:18 +08:00 gws 的思路是自己实现一个极简的 eventloop ,而不用 go 自带的协程机制,从而没有什么额外的开销,提高性能? |
![]() | 20 Nazz OP @chuanqirenwu 同步模式没开额外协程,异步模式会开非常驻的协程,执行完任务就退出, 两种模式都没使用 channel. |
![]() | 21 MindMindMax 2023-03-05 04:16:09 +08:00 对于常规项目价值在哪? channel 的价值又在哪? |
![]() | 22 Nazz OP @MindMindMax 尽量使用 mutex 替代 chan. 很多时候保证线程安全就行了,不需要多线程通信. channel 我用得最多的地方是线程同步和超时控制. |
![]() | 23 Nazz OP @chuanqirenwu 确实有 EventLoop. 最开始我是模仿的 JS, 因为我认为 JS WebSocket API 比 gorilla/nhooyr 这些提供的都要清晰得多. 初版只有 Sync IO, Read=>Event Handler=>Write 循环往复. 后面在此基础上加了 Async IO, AIO 模式在每个连接上有读写两个任务队列(并发度分别是 N 和 1), 就是我分享的这个实现, 它需要足够的轻量. 两种模式压测表现都比 gorilla 好得多, 原因大概是 Parser 本身的简单高效和无额外常驻协程吧, 如果有, 协程数量会增加一倍. |
![]() | 24 chuanqirenwu 2023-03-05 12:45:03 +08:00 ![]() @Nazz ,虽然不怎么搞 go ,但感觉这个思路挺不错的。我看 README 的简介写的是 go websocket server ,是只支持 server 端吗? client 端没有实现? |
![]() | 25 Nazz OP @chuanqirenwu 刚实现的 client ,还在测试 |
26 rockuw 2023-03-13 10:21:57 +08:00 mutex 是比 channel 轻量,但是每个 job 新建一个 goroutine 也是有代价的。一个简单的固定 goroutine 数量的实现,测试结果还稍微好一些,分配次数则明显更低 ``` N=10000 goos: linux goarch: amd64 pkg: muwu.com/example/workerqueue cpu: Intel(R) Xeon(R) CPU E5-2682 v4 @ 2.50GHz BenchmarkGwsWorkerQueue-8 903 1310335 ns/op 55471 B/op 2010 allocs/op BenchmarkGopool-8 897 1394589 ns/op 17926 B/op 1059 allocs/op BenchmarkAnts-8 1203 1020211 ns/op 16046 B/op 1001 allocs/op BenchmarkNbio-8 956 1278696 ns/op 48017 B/op 2999 allocs/op BenchmarkChan-8 1004 1181569 ns/op 16016 B/op 1001 allocs/op ``` ```go type workerQueueV1 struct { maxConn int queue chan Job } func newWorkerQueueV1(n int) *workerQueueV1 { wq := &workerQueueV1{ maxConn: n, queue: make(chan Job, 1024), } for i := 0; i < n; i++ { go func() { for job := range wq.queue { job() } }() } return wq } func (wq *workerQueueV1) Push(job Job) { wq.queue <- job } ``` |
![]() | 27 Nazz OP @rockuw 从你的 Benchmark 结果来看, 差距不大. GwsWorkqueue 是专门为 IO 任务设计的, 每个 WebSocket 连接上有读写两个任务队列, 它们非常轻量, 而且并行读写不会新增常驻协程. 量变产生质变, 每个连接上都增加常驻协程会使 CPU 使用率提高不少. 实际业务中并发不会很高, 可以用优先队列替代普通队列减少 allocs, 收益不高我懒得去优化了, 复用 goroutine 对于 IO 任务收益也不大. |
![]() | 28 ClarkAbe 2023-03-23 14:49:55 +08:00 via Android 感觉每次都起一个协程有点浪费... |