
在工作中, 我们经常会碰到重试的需求, 很多人会用定时任务来实现. 如果有更高的要求, 可以尝试下"延迟时间指数递增"的策略, 相比定时任务, 更加节省 CPU 和数据库资源.
为简化逻辑, 代码中没有使用 Redis 或 MySQL 维护失败次数.
package main import ( "errors" "fmt" "github.com/lxzan/memorycache" "math" "sync/atomic" "time" ) const ( MaxRetryTimes = 17 // 最大重试次数 FirstDelay = 5 * time.Second // 初始延迟 ) func NewRetryer() *Retryer { return &Retryer{ mc: memorycache.New[string, string]( memorycache.WithBucketNum(16), memorycache.WithBucketSize(16, math.MaxInt64), memorycache.WithInterval(time.Second, 5*time.Second), ), } } type Retryer struct { mc *memorycache.MemoryCache[string, string] } func (c *Retryer) Add(jobId string, jobFunc func() error) { // 检查任务是否已存在 if _, exist := c.mc.GetOrCreate(jobId, jobId, -1); exist { return } var callback = func(element *memorycache.Element[string, string], reason memorycache.Reason) { // 只处理过期触发的回调 if reason != memorycache.ReasonExpired { return } // 回调函数必须是非阻塞的; 酌情使用任务队列, 控制最大并发. go func(id string) { if err := jobFunc(); err == nil { c.delete(id) } }(element.Value) } // 注册延迟及回调函数 var failedTimes = 1 var delay, delta = FirstDelay, FirstDelay for i := 1; i <= MaxRetryTimes; i++ { if i >= failedTimes { var key = fmt.Sprintf("%s-%d", jobId, i) c.mc.SetWithCallback(key, jobId, delay, callback) } delta *= 2 delay += delta } } // 任务执行成功, 删除剩余的重试任务 func (c *Retryer) delete(jobId string) { for i := 1; i <= MaxRetryTimes; i++ { var key = fmt.Sprintf("%s-%d", jobId, i) c.mc.Delete(key) } c.mc.Delete(jobId) } // 模拟一个任务 func newJob() (jobId string, jobFunc func() error) { var counter = new(atomic.Int64) return "1", func() error { defer counter.Add(1) serial := counter.Load() if serial < 5 { fmt.Printf("serial=%d, t=%d, success=false\n", serial, time.Now().Unix()) return errors.New("test") } fmt.Printf("seria=%d, t=%d, success=true\n", serial, time.Now().Unix()) return nil } } func main() { retryer := NewRetryer() jobId, jobFunc := newJob() if err := jobFunc(); err != nil { retryer.Add(jobId, jobFunc) } select {} } serial=0, t=1705196714, success=false serial=1, t=1705196719, success=false serial=2, t=1705196729, success=false serial=3, t=1705196749, success=false serial=4, t=1705196789, success=false serial=5, t=1705196869, success=true 1 0Z03ry75kWg9m0XS 2024 年 1 月 14 日 指数退避么 |
3 diagnostics 2024 年 1 月 14 日 这有啥好分享的,又不是大学生 |
4 Opportunity 2024 年 1 月 14 日 现在连重试都需要上 redis 了吗。。 |
5 zhangzEric 2024 年 1 月 14 日 via iPhone @Opportunity 本地重试不需要,分布式异步重试就得有地方存储了 |
6 Nazz OP @Opportunity 我就用 redis 保存下失败次数,不想用 mysql |
7 shellcodecow 2024 年 1 月 22 日 设计上 建议独立一个服务做调度功能 比如延迟调度、循环调度、递增调度 通过 GRPC 进行 timeout 或者 callback |
8 Nazz OP @shellcodecow 分布式是该这样 |