diff --git a/launcher.go b/launcher.go index 00f6f5d..146982d 100644 --- a/launcher.go +++ b/launcher.go @@ -13,7 +13,7 @@ type Launcher struct { running bool mu sync.Mutex - poller *poller + poller *poller processor *processor } @@ -24,7 +24,7 @@ func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) processor := newProcessor(rdb, poolSize, nil) return &Launcher{ - poller: poller, + poller: poller, processor: processor, } } diff --git a/processor.go b/processor.go index 05a9ffe..aba4968 100644 --- a/processor.go +++ b/processor.go @@ -3,8 +3,6 @@ package asynq import ( "fmt" "log" - "math" - "math/rand" "time" ) @@ -83,31 +81,7 @@ func (p *processor) exec() { defer func() { <-p.sema }() // release token err := p.handler(task) if err != nil { - if msg.Retried >= msg.Retry { - fmt.Println("Retry exhausted!!!") - if err := p.rdb.kill(msg); err != nil { - log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) - } - return - } - fmt.Println("RETRY!!!") - retryAt := time.Now().Add(delaySeconds((msg.Retried))) - fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) - msg.Retried++ - msg.ErrorMsg = err.Error() - if err := p.rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { - // TODO(hibiken): Not sure how to handle this error - log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) - return - } + retryTask(p.rdb, msg, err) } }(t) } - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/retry.go b/retry.go new file mode 100644 index 0000000..fb949ed --- /dev/null +++ b/retry.go @@ -0,0 +1,36 @@ +package asynq + +import ( + "fmt" + "log" + "math" + "math/rand" + "time" +) + +func retryTask(rdb *rdb, msg *taskMessage, err error) { + if msg.Retried >= msg.Retry { + fmt.Println("[DEBUG] Retry exhausted!!!") + if err := rdb.kill(msg); err != nil { + log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) + } + return + } + retryAt := time.Now().Add(delaySeconds((msg.Retried))) + fmt.Printf("[DEBUG] Retrying the task in %v\n", retryAt.Sub(time.Now())) + msg.Retried++ + msg.ErrorMsg = err.Error() + if err := rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { + // TODO(hibiken): Not sure how to handle this error + log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) + return + } +} + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +}