
PUBLISHER 代码:
func main() { nc,err := stan.Connect("test-cluster","idc",stan.NatsURL("nats://127.0.0.1:4222")) if err != nil{ panic(err) } fmt.Println("connect succ") for i:=0;i<10;i++{ fmt.Println("publishing:",i) err := nc.Publish("tp1",[]byte(strconv.Itoa(i))) if err != nil{ panic(err) } } nc.Close() } QueueSubscriber 代码:
func main() { nc,err := stan.Connect("test-cluster","subscriber",stan.NatsURL("nats://localhost:4222")) if err != nil{ panic(err) } defer nc.Close() subs := make([]stan.Subscription,3) for i:=0;i<3;i++{ workername := "worker"+strconv.Itoa(i) fmt.Println(fmt.Sprintf("QueueSubscribe %s start",workername)) sub,err := nc.QueueSubscribe("tp1","ch1", func(msg *stan.Msg) { fmt.Println(workername,"get msg:",string(msg.Data),"start doing something") time.Sleep(1*time.Second) },stan.DurableName("subscriber"),stan.AckWait(time.Hour*24)) if err != nil{ panic(err) } subs[i] = sub } c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill) select{ case <- c: fmt.Println("Subscriber CLOSE") for i,_ := range subs{ subs[i].Close() } nc.Close() fmt.Println("quit") } } Publisher 输出:
connect succ publishing: 0 publishing: 1 publishing: 2 publishing: 3 publishing: 4 publishing: 5 publishing: 6 publishing: 7 publishing: 8 publishing: 9 QueueSubscriber 输出:
QueueSubscribe worker0 start QueueSubscribe worker1 start worker0 get msg: 0 start doing something QueueSubscribe worker2 start worker0 get msg: 1 start doing something worker0 get msg: 2 start doing something worker0 get msg: 3 start doing something worker0 get msg: 4 start doing something worker0 get msg: 5 start doing something worker0 get msg: 6 start doing something worker0 get msg: 7 start doing something worker0 get msg: 8 start doing something worker0 get msg: 9 start doing something 请问朋友们是否有遇到过一样的问题呢?谢谢大家
1 freestyle 2019-08-30 12:44:52 +08:00 via iPhone 这是特性,queue 模式. 如果想每个订阅者都收到,设置不同的 queue 名字或普通方式订阅就行. 可以看下我的博客 https://imhanjm.com/2018/02/17/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3nats%20&%20nats%20streaming/ |
2 heavyrainn OP @freestyle 额…不是,我的意思是,为啥其他的 worker 不工作,只有 worker0 在工作 |
3 freestyle 2019-08-31 07:16:06 +08:00 via iPhone @heavyrainn 你这是同一个连接啊 一般 queueSub 是不同的进程即不同的连接 你试试给每个 worker 创建一个连接. |
4 heavyrainn OP @freestyle 我搞清楚了…是因为没有设置 MaxInflight 值的问题,派出的任务都最先动的 worker 给接收了。设置 MaxInflight 值为 1 实现了正常分 worker 执行。谢谢啦 |