From 62db9863fbe4caf9c0cce45cb9dbac8413bdebc2 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 17 Nov 2019 21:13:41 -0800 Subject: [PATCH] Send retry exhausted tasks to "dead" zset --- asynq.go | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/asynq.go b/asynq.go index 620373e..3a51e58 100644 --- a/asynq.go +++ b/asynq.go @@ -2,9 +2,7 @@ package asynq /* TODOs: -- [P0] Task error handling -- [P0] Retry -- [P0] Dead task (retry exausted) +- [P0] Write tests - [P0] Shutdown all workers gracefully when the process gets killed - [P1] Add Support for multiple queues - [P1] User defined max-retry count @@ -154,8 +152,10 @@ func (w *Workers) Run(handler TaskHandler) { err := handler(task) if err != nil { if msg.Retried >= msg.Retry { - // TODO(hibiken): Add the task to "dead" collection - fmt.Println("Retry exausted!!!") + fmt.Println("Retry exhausted!!!") + if err := kill(w.rdb, &msg); err != nil { + log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) + } return } fmt.Println("RETRY!!!") @@ -242,6 +242,26 @@ func zadd(rdb *redis.Client, zset string, zscore float64, msg *taskMessage) erro return rdb.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err() } +const maxDeadTask = 100 +const deadExpirationInDays = 90 + +// kill sends the task to "dead" sorted set. It also trim the sorted set by +// timestamp and set size. +func kill(rdb *redis.Client, msg *taskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not encode task into JSON: %v", err) + } + now := time.Now() + pipe := rdb.Pipeline() + pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) + limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit))) + pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100 + _, err = pipe.Exec() + return err +} + // listQueues returns the list of all queues. func listQueues(rdb *redis.Client) []string { return rdb.SMembers(allQueues).Val()