channel 的关闭时机 - V2EX
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
ethsol

channel 的关闭时机

  •  
  •   ethsol Dec 14, 2022 2392 views
    This topic created in 1251 days ago, the information mentioned may be changed or developed.

    go 新人请教大佬一个关闭 channel 的问题,发送端逻辑是历遍一堆目录,把里面的文件发送到 chan ,递归方式实现。这里的 chan 关闭有什么方法。

    目前这个代码跑起来的问题是会一直阻塞,要手动关闭

    func main() { var wg sync.WaitGroup objchan := make(chan []string, 10) wg.Add(1) go func(och <-chan []string) { defer wg.Done() for objs := range och { do_something(objs) } }(objchan) for _, perfix := range []string{"test", "tc"} { go Getfile(perfix, objchan) } wg.Wait() } func Getfile(dir string, filechan chan<- []string) { // send files ... filechan <- files // 子目录递归 if dir { go Getfile(dir, filechan) } } 
    20 replies    2022-12-20 21:01:45 +08:00
    bebop
        1
    bebop  
       Dec 14, 2022
    wangyu17455
        2
    wangyu17455  
       Dec 14, 2022
    for objs := range och 改成 for objs, ok := range och
    ok 会在 channel 关闭后变成 false
    wangyu17455
        3
    wangyu17455  
       Dec 14, 2022
    记错了,for 不能用这个写法,正常读取可以
    ethsol
        4
    ethsol  
    OP
       Dec 14, 2022
    @bebop 问题是递归,Getfile 不知道会跑多少次
    sduoduo233
        5
    sduoduo233  
       Dec 14, 2022
    感觉可以参考一下这个: https://stackoverflow.com/questions/13217547/tour-of-go-exercise-10-crawler ,每递归一次就 wg.Add(1)
    ethsol
        6
    ethsol  
    OP
       Dec 14, 2022
    @bebop
    @sduoduo233
    改成了在历遍时候 add ,但是结果有点奇怪,只能随机处理"test", "tc"中的一个。
    在 wg.Wait()后面 time.Sleep ,才能显示完整
    ethsol
        7
    ethsol  
    OP
       Dec 14, 2022
    发送端效率 》 接收端效率,所以发送端先关闭可能造成结果不完整?
    所以还是在接收端处理 chan 关闭比较好?
    bebop
        8
    bebop  
       Dec 14, 2022
    使用协程池,而不是每次都创建一个 chan 。
    和是不是递归没有关系,只要能把数据全部写到 chan 就行。

    func main() {
    poolNum := 10

    var wg sync.WaitGroup
    pool := make(chan string, poolNum)

    // 处理文件
    for i := 0; i < poolNum; i++ {
    wg.Add(1)

    go func(wg *sync.WaitGroup, ch <-chan string) {
    defer wg.Done()

    for filename := range ch {
    fmt.Println(filename)
    }
    }(&wg, pool)
    }

    // 遍历文件
    err := filepath.Walk(".",
    func(path string, info os.FileInfo, err error) error {
    if err != nil {
    return err
    }

    pool <- path
    return nil
    })
    if err != nil {
    log.Println(err)
    }

    close(pool)
    wg.Wait()
    }
    zjj19950716
        9
    zjj19950716  
       Dec 14, 2022
    @zong400 关闭的时候有数据的话,接收端也会先收完的,接收端关闭有 panic 的风险,你不知道什么时候关,你就每个 Getfile 里再用个 wg 碰到 dir 就 wg add 1 , 最顶级目录完成了就是完成了
    sibowen
        10
    sibowen  
       Dec 14, 2022
    ```golang

    import (
    "fmt"
    "io/ioutil"
    "os"
    "sync"
    )

    var DirPrefix string

    func main() {
    DirPrefix, _ = os.Getwd()
    DirPrefix += "/dir/"
    var wg sync.WaitGroup
    objchan := make(chan string, 10)
    wg.Add(1)
    go func(och <-chan string) {
    defer wg.Done()
    for objs := range och {
    fmt.Println(objs)
    }
    }(objchan)

    wg.Add(1)
    go func(och chan string) {
    defer wg.Done()
    var wgDir sync.WaitGroup
    for _, perfix := range []string{"test", "tc"} {
    wgDir.Add(1)
    go GetFile(perfix, och, &wgDir)
    }
    wgDir.Wait()
    close(objchan)
    }(objchan)
    wg.Wait()
    }

    func GetFile(dir string, fileChan chan string, wg *sync.WaitGroup) {
    defer wg.Done()
    // send files
    dirNow := DirPrefix+dir
    files, _ := ioutil.ReadDir(dirNow)
    // 子目录递归
    for _, v := range files {
    filePath := DirPrefix+dir+"/"+v.Name()
    if IsDir(filePath) {
    wg.Add(1)
    go GetFile(filePath, fileChan, wg)
    } else {
    fileChan <- filePath
    }
    }
    }

    func IsDir(path string) bool{
    s, err := os.Stat(path)
    if err != nil {
    return false
    }
    return s.IsDir()
    }

    ```

    把读取文件的操作包装到单独的协程里;
    在读取操作完成后,close chan ;
    试试上面这段。
    ethsol
        11
    ethsol  
    OP
       Dec 14, 2022
    @sibowen
    @zjj19950716
    @bebop
    我试试,谢谢
    nxcdJaNnmyF9O90X
        12
    nxcdJaNnmyF9O90X  
       Dec 14, 2022
    您可以在 Getfile 函数中关闭 channel 。您可以在每次遍历子目录时,将 channel 传递给下一个 goroutine ,并在当前 goroutine 中关闭 channel 。例如:

    ```
    func Getfile(dir string, filechan chan<- []string) {
    // send files
    ...
    filechan <- files

    // 子目录递归
    if dir {
    // 关闭当前 goroutine 中的 channel
    close(filechan)
    // 在新 goroutine 中继续遍历子目录
    go Getfile(dir, filechan)
    }
    }
    ```
    这样,您就可以在遍历完一个子目录之后,关闭该目录中的 channel ,并在新 goroutine 中继续遍历子目录。这样,遍历完所有子目录后,您就可以在主函数中等待所有 goroutine 完成后退出程序。
    ethsol
        13
    ethsol  
    OP
       Dec 15, 2022
    @xingjue channel 能重新打开?
    ethsol
        14
    ethsol  
    OP
       Dec 15, 2022
    目前代码是这样,问题是为什么后面不加 sleep 就只能随机显示 test ,tc 其中一个的内容?

    ```
    func main() {
    var wg sync.WaitGroup

    objchan := make(chan []string, 10)

    go func(och <-chan []string) {
    for objs := range och {
    println(objs)
    }
    }(objchan)

    for _, perfix := range []string{"test", "tc"} {
    wg.Add(1)
    go Getfile(perfix, objchan, &wg)
    }

    wg.Wait()
    time.Sleep(1)
    }

    func Getfile(dir string, filechan chan<- []string, wg *sync.WaitGroup) {
    defer wg.Done()
    // send files
    ...
    filechan <- files
    // 子目录递归
    for _, dir := range dirs {
    wg.Add(1)
    go Getfile(dir, filechan)
    }
    }
    ```
    ethsol
        15
    ethsol  
    OP
       Dec 15, 2022
    @sibowen 按你改的写和上面的一样,需要加个 sleep ,不然就显示不全,我要处理的是对象存储,通过发 http 请求,是不是和 os 文件系统底层不一样导致你的代码不行

    ```
    wg.Add(1)
    go func(och chan<- []cos.Object) {
    defer wg.Done()
    var wgg sync.WaitGroup
    for _, perfix := range []string{"test", "tc"} {
    wgg.Add(1)
    go tools.GetObjs(cosClient, perfix, objchan, &wgg)
    }
    wgg.Wait()
    close(och)
    }(objchan)
    ```
    ethsol
        16
    ethsol  
    OP
       Dec 15, 2022
    用#1 介绍的协程池方法,目前可行
    ethsol
        17
    ethsol  
    OP
       Dec 15, 2022
    但是协程池 感觉复杂了一层,一定要这样?
    sibowen
        18
    sibowen  
       Dec 15, 2022
    @zong400 能具体描述下你的使用场景吗? 读取文件是 http 请求获取的?还是消费 chan 的地方要有 http 请求?还是什么
    ethsol
        19
    ethsol  
    OP
       Dec 16, 2022
    @sibowen 腾讯的对象存储,读写都是用 sdk 的
    yaott2020
        20
    yaott2020  
       Dec 20, 2022 via Android
    https://github.com/smallnest/chanx

    无限缓存 channel ,可以实现无限写,写完再读
    About     Help     Advertise     Blog     API     FAQ     Solana     4241 Online   Highest 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 44ms UTC 04:16 PVG 12:16 LAX 21:16 JFK 00:16
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86