go 的 channel 的一个疑问 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
weiweiwitch
V2EX    Go 编程语言

go 的 channel 的一个疑问

  •  1
     
  •   weiweiwitch 2017-03-20 20:51:41 +08:00 2262 次点击
    这是一个创建于 3135 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。

    我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。

    现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。

    我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?

    由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。

    13 条回复    2017-03-30 21:28:22 +08:00
    Muninn
        1
    Muninn  
       2017-03-20 21:10:17 +08:00 via Android   2
    这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力

    评估下内存够用不 不够了需要改架构持久化
    weiweiwitch
        2
    weiweiwitch  
    OP
       2017-03-20 21:28:28 +08:00
    @Muninn 内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。

    绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。
    znood
        3
    znood  
       2017-03-20 21:31:11 +08:00   1
    如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
    pkking
        4
    pkking  
       2017-03-20 21:36:06 +08:00
    可以借鉴下拥塞算法
    PhilC
        5
    PhilC  
       2017-03-20 22:06:27 +08:00
    你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
    jiumingmao
        6
    jiumingmao  
       2017-03-20 22:25:11 +08:00
    使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
    jiumingmao
        7
    jiumingmao  
       2017-03-20 22:27:21 +08:00
    不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
    iot
        8
    iot  
       2017-03-20 22:48:43 +08:00
    生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
    ghbai
        9
    ghbai  
       2017-03-21 08:38:42 +08:00
    gocrawl(开源爬虫类库)的一种方案
    https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go

    ```
    type popChannel chan []*URLContext
    // The stack function ensures the specified URLs are added to the pop channel
    // with minimal blocking (since the channel is stacked, it is virtually equivalent
    // to an infinitely buffered channel).
    func (pc popChannel) stack(cmd ...*URLContext) {
    toStack := cmd
    for {
    select {
    case pc <- toStack:
    return
    case old := <-pc:
    // Content of the channel got emptied and is now in old, so append whatever
    // is in toStack to it, so that it can either be inserted in the channel,
    // or appended to some other content that got through in the meantime.
    toStack = append(old, toStack...)
    }
    }
    }
    ```
    weiweiwitch
        10
    weiweiwitch  
    OP
       2017-03-21 09:17:45 +08:00
    @ghbai 如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
    ghbai
        11
    ghbai  
       2017-03-21 12:44:15 +08:00
    @weiweiwitch 对,是不能保证有序的。
    khowarizmi
        12
    khowarizmi  
       2017-03-21 15:13:59 +08:00
    在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
    gwind
        13
    gwind  
       2017-03-30 21:28:22 +08:00
    好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
    建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
    纯 golang 的 nanomsg : https://github.com/go-mangos/mangos
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     4348 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 39ms UTC 01:03 PVG 09:03 LAX 18:03 JFK 21:03
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86