Golang 实现工作池,处理每分钟百万请求数 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
qianguozheng

Golang 实现工作池,处理每分钟百万请求数

  •  1
     
  •   qianguozheng
    qianguozheng 2017 年 1 月 14 日 2929 次点击
    这是一个创建于 3388 天前的主题,其中的信息可能已经有所发展或是发生改变。

    使用 Golang 实现了一个简单的消费者模式, 主要解决每分钟百万请求问题的技术方案。 基本原理:建立固定的工作线程去缓冲池中取数据处理。以此来控制固定时间内处理的请求数

    https://github.com/qianguozheng/go-workerpool.git

    实际使用场景来自 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

    国内最近翻译的也有,可以自行搜索。

    10 条回复    2017-02-27 09:27:28 +08:00
    kslr
        1
    kslr  
       2017 年 1 月 14 日
    Nice
    pubby
        2
    pubby  
       2017 年 1 月 15 日
    实际上 MaxQueue 没起到作用,发生问题后任务压在这个相对轻量一些的 goroutine 上而已

    for{
    select{
    case job:= <-JobQueue:
    // a job request has been received
    fmt.Println("Store a job into jobChannel")
    go func(job Job){
    //try to obtain a worker job channel that is available.
    //this will block until a worker is idle
    jobChannel := <- d.WorkerPool
    //dispatch the job to the worker job channel
    jobChannel <- job
    }(job)
    }
    }
    qianguozheng
        3
    qianguozheng  
    OP
       2017 年 1 月 15 日
    @pubby 通过 jobqueue 的长度,可以控制工作者 routine 取数据的速率吧
    pubby
        4
    pubby  
       2017 年 1 月 15 日 via Android
    @qianguozheng 你从 JobQueue 取任务后直接起 goroutine 来等待空闲 worker 。相当于把 JobQueue 的长度又不可控的延长了
    qianguozheng
        5
    qianguozheng  
    OP
       2017 年 1 月 16 日
    @pubby JobQueue 的长度, workder 的数量都是可以调控的, 正是通过调控两者的长度来控制处理的速率。
    pubby
        6
    pubby  
       2017 年 1 月 16 日
    @qianguozheng 处理速度是控制了
    但是从 JobQueue 里取任务的速度没控制,前面不管有多少 job 进来,这边都在新开 go func()来等待 worker

    如果因为其他原因所有 worker 都需要长时间处理的话, goroutine 数量就会猛增,只是这个 goroutine 比较轻量,只是等待空闲 worker 而已,所以系统资源没有那么快炸
    qianguozheng
        7
    qianguozheng  
    OP
       2017 年 1 月 17 日
    @pubby 我觉得你理解有误。
    1. 本程序的目标就是 通过调控 JobQueue 的长度, worker 的数量来达到最大处理能力,在输入压力再大的情况,处理速率都是不变的。
    2. 从 JobQueue 里面取任务的是 worker ,而 worker 的数量是固定的。不会新增任何的 goroutine 来处理。你说的就永远不会发生。
    pubby
        8
    pubby  
       2017 年 1 月 17 日
    @qianguozheng 是的,你输入压力再大, worker 处理速度也不变,那么再大的处理压力去哪里了呢?

    你这些压力都去了这里:

    go func(job Job){
    //try to obtain a worker job channel that is available.
    //this will block until a worker is idle
    jobChannel := <- d.WorkerPool
    //dispatch the job to the worker job channel
    jobChannel <- job
    }(job)

    你 worker 数量有限,那么来不及处理的那些压力就会在这里变成 goroutine 来等待 <-d.WorkderPool
    qianguozheng
        9
    qianguozheng  
    OP
       2017 年 1 月 17 日
    @pubby

    对,没错。 是这样的
    ijustdo
        10
    ijustdo  
       2017 年 2 月 27 日
    没错 这个方法是简单粗暴有效的 我以前类似的 服务端程序 跑了 6 年多 还没嗝屁

    全局任务队列 加锁
    结果处理队列 加锁

    N(参数)个处理任务的线程 每个线程 while 1 就是干活的 干什么由 task_handle 引用传入
    每个线程有一个 event 控制暂停 启动 停止 有个参数控制 每个任务处理后的间隔

    定时取任务线程(这个有必要, 当任务数量是不可预期的, 或者是动态的) 当然现在类似 rabbitmq 消息队列之类 好搞多了
    为什么上面线程要加暂停呢 因为当取任务都没取到(干完了) 任务队列又为空 可以暂停所有干活的线程 等到 某时刻 来任务了 在通知兄弟们干起来 呵呵

    处理结果线程(可以多个)
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2858 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 36ms UTC 12:47 PVG 20:47 LAX 05:47 JFK 08:47
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86