
我咋找不到呢
func (t *Topic) PutMessage(m *Message) error { t.RLock() defer t.RUnlock() if atomic.LoadInt32(&t.exitFlag) == 1 { return errors.New("exiting") } err := t.put(m) if err != nil { return err } atomic.AddUint64(&t.messageCount, 1) return nil } func (t *Topic) put(m *Message) error { select { case t.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, t.backend) bufferPoolPut(b) t.ctx.nsqd.SetHealth(err) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to write message to backend - %s", t.name, err) return err } } return nil } 1 thomaswang OP ```go func (t *Topic) messagePump() { for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } |