各位大佬,请教一个 golang 多线程的阻塞问题 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
deavorwei
V2EX    Go 编程语言

各位大佬,请教一个 golang 多线程的阻塞问题

  •  
  •   deavorwei 2024-08-22 18:49:37 +08:00 2129 次点击
    这是一个创建于 421 天前的主题,其中的信息可能已经有所发展或是发生改变。

    编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

    通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

    但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

    taskChan 的缓冲区为 1000 时,阻塞的日志如下:

    [DEBUG]:增加目录,增加 wg, [1802], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1801], taskChan = [5] [DEBUG]:任务完成,减小 wg, [1798], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1797], taskChan = [17] [DEBUG]:任务完成,减小 wg, [1816], taskChan = [4] [DEBUG]:增加目录,增加 wg, [1803], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1843], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1797], taskChan = [21] [DEBUG]:任务完成,减小 wg, [1841], taskChan = [24] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1840], taskChan = [6] [DEBUG]:任务完成,减小 wg, [1798], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1840], taskChan = [13] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1840], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [2] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1845], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1846], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1847], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1848], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1849], taskChan = [0] 

    如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

    [DEBUG]:增加目录,增加 wg, [2], taskChan = [0] [DEBUG]:增加目录,增加 wg, [3], taskChan = [0] [DEBUG]:增加目录,增加 wg, [4], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:增加目录,增加 wg, [4], taskChan = [0] [DEBUG]:增加目录,增加 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [0], taskChan = [0] [INFO]:目录扫描完毕 [DEBUG]:func GetAllFilePath end [DEBUG]:func StartScan end [DEBUG]:func btnStartScanOnclick end 

    代码如下:

    package core import ( "DopliGo/logs" "github.com/panjf2000/ants/v2" "os" "path/filepath" "sync" "sync/atomic" ) func GetAllFilePath(rootPath string) { //logs.IsLogDebug = false logs.Debug("func GetAllFilePath start") // 创建任务通道和结果通道 var taskChan = make(chan string, 1000000) var resultChan = make(chan string, 1000000) var wg sync.WaitGroup var counter int64 = 0 // 创建生产者 goroutine 池 producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) { produceTasks(i.(string), taskChan, resultChan, &counter, &wg) }) logs.Debug("cap:%d", producerPool.Cap()) defer producerPool.Release() taskChan <- rootPath wg.Add(1) // 这里增加计数器 atomic.AddInt64(&counter, 1) logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan)) // 启动生产者 go func() { //defer logs.Debug("生产者退出") for task := range taskChan { err := producerPool.Invoke(task) if err != nil { logs.Error("failed to producerPool Invoke, err: %s", err) return } } }() // 启动结果处理 goroutine go func() { //defer logs.Debug("消费者退出") for result := range resultChan { _ = result } }() // 等待所有任务完成 wg.Wait() close(resultChan) close(taskChan) logs.Info("目录扫描完毕") logs.Debug("func GetAllFilePath end") } func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) { defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done // logs.Debug("func produceTasks start") entries, err := os.ReadDir(rootPath) if err != nil { logs.Error("failed to read dir: %s , err: %s", rootPath, err) return } for _, entry := range entries { path := filepath.Join(rootPath, entry.Name()) if entry.IsDir() { wg.Add(1) atomic.AddInt64(counter, 1) select { case taskChan <- path: // 发送成功 default: // 发送失败,通道已满 logs.Error("写入通道失败...") } logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan)) } else { resultChan <- path } } atomic.AddInt64(counter, -1) logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan)) //logs.Debug("func produceTasks end") } 
    18 条回复    2024-08-23 15:40:21 +08:00
    justseemore
        1
    justseemore  
       2024-08-22 19:19:40 +08:00
    select 的时候 写入到 chan 不阻塞 chan 满的时候会直接执行 default
    josexy
        2
    josexy  
       2024-08-22 19:30:35 +08:00
    ```go
    if entry.IsDir() {
    atomic.AddInt64(counter, 1)
    select {
    case taskChan <- path:
    // 发送成功
    wg.Add(1)
    default:
    // 发送失败,通道已满
    logs.Error("写入通道失败..")
    }

    ```
    你把 wg.Add(1) 放到里面,然后 channel 容量设置大点就可以了,这样只有发送成功才处理
    matytan
        3
    matytan  
       2024-08-22 19:39:11 +08:00
    produceTasks 中 for 循环 wg.add(1)多次,但是只 done 了一次(函数结束)为什么?而且你这个 wg 用的好奇怪
    DefoliationM
        4
    DefoliationM  
       2024-08-22 19:40:46 +08:00 via Android
    default 删了,如果你想控制退出,把 default 换成 context 。
    matytan
        5
    matytan  
       2024-08-22 19:53:38 +08:00   1
    分析了一下你这个代码打印一定是阻塞在了读取 taskChan ,为什么堵塞,大概率是协程池 invoke 的时候堵塞了,我换成 go 携程跑没问题。具体为什么可能需要探索下 ants
    go func() {
    for task := range taskChan {
    fmt.Printf("task: %s\n", task)
    // err := producerPool.Invoke(task)
    // if err != nil {
    // fmt.Printf("failed to producerPool Invoke, err: %s\n", err)
    // return
    // }
    go produceTasks(task, taskChan, resultChan, &counter, &wg)
    }
    }()
    matytan
        6
    matytan  
       2024-08-22 19:57:18 +08:00
    @matytan #5 produceTasks 中另外 defalut 要删掉,通道满了应该也要等吧,不然可能会漏?
    deavorwei
        7
    deavorwei  
    OP
       2024-08-22 20:48:53 +08:00
    @matytan 大佬牛逼,taskChan 设置为 1000 我用协程跑也没问题了,ants 我得再去看看怎么用比较合适; default 是我为了诊断是不是 taskChan 写不进去才加的,正常应该是没有。
    另外请教下,为什么 taskChan <- path 写不进去,我实时打印 len ,占用都只有几十,我长度设置的 1000。
    deavorwei
        8
    deavorwei  
    OP
       2024-08-22 20:50:15 +08:00
    @zpfhbyx 是的,但是我打印的日志 taskChan 占用只有几十,容量设置是 1000 ,应该不会满才对
    yianing
        9
    yianing  
       2024-08-22 22:46:24 +08:00   1
    @deavorwei #7 #7 taskChan = [%d]", len(resultChan) 打印错变量了
    deavorwei
        10
    deavorwei  
    OP
       2024-08-23 10:45:35 +08:00
    @yianing #9 感谢大佬,我这低级错误过分了,但是我修改正确之后,结果还是一样的
    pxllong
        11
    pxllong  
       2024-08-23 11:39:03 +08:00
    用 runtime/pprof 。
    yann123
        12
    yann123  
       2024-08-23 14:15:07 +08:00
    default:
    wg.Done()
    logs.Error("Failed to write to channel...")


    写入通过失败了你没有减少锁,所以一直卡住了。
    yann123
        13
    yann123  
       2024-08-23 14:17:35 +08:00
    wg 计数的时候注意,一定要确保可以执行 wg.Done()操作,否则就卡住了。
    deavorwei
        14
    deavorwei  
    OP
       2024-08-23 15:03:42 +08:00
    @yann123 #13 大佬,还是不行 ,我现在是使用的协程直接跑,taskChan 是 1000 ,然后 wg.done 也加了。跑起来就挂 ,如果把 taskChan 改成 100 万就很正常
    matytan
        15
    matytan  
       2024-08-23 15:08:02 +08:00
    @deavorwei #7 并不是占用的问题,是用 ants 的时候你的 channel 大小小于了文件数量,导致死锁的,用 go 携程可以正常等待结束了继续往通道里面放。这种问题直接用极端的办法,把 channel 大小设置为 1 ,看会不会死锁。你用 go 原生协程跑,channel 为 1 都 ok 的,只是慢一点
    matytan
        16
    matytan  
       2024-08-23 15:09:31 +08:00
    @deavorwei #7 跑不满通道的原因可能是处理比较快,一直都没有满过,只能说明设置为 1000 没必要哈哈哈
    deavorwei
        17
    deavorwei  
    OP
       2024-08-23 15:39:06 +08:00
    @matytan #16
    明白了,大佬,感谢感谢!
    deavorwei
        18
    deavorwei  
    OP
       2024-08-23 15:40:21 +08:00
    @yann123 #13 大佬,不好意思,可以了,添加了 wg.done 是 ok 的,我运行错程序了 ,感谢指点
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2784 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 23ms UTC 07:06 PVG 15:06 LAX 00:06 JFK 03:06
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86