diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fc3fbd7..859e0b7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -133,24 +133,29 @@ func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { // Retry moves the task from in-progress to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { - bytes, err := json.Marshal(msg) + bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + modified := *msg + modified.Retried++ + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry - // ARGV[1] -> TaskMessage value - // ARGV[2] -> error message + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Retry queue // ARGV[3] -> retry_at UNIX timestamp script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) - local msg = cjson.decode(ARGV[1]) - msg["Retried"] = msg["Retried"] + 1 - msg["ErrorMsg"] = ARGV[2] - redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result() + _, err = script.Run(r.client, []string{inProgressQ, retryQ}, + string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result() return err } @@ -160,30 +165,34 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { const maxDeadTask = 10 const deadExpirationInDays = 90 - bytes, err := json.Marshal(msg) + bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + modified := *msg + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:dead - // ARGV[1] -> TaskMessage value - // ARGV[2] -> error message + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in dead queue (e.g., 100) script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) - local msg = cjson.decode(ARGV[1]) - msg["ErrorMsg"] = ARGV[2] - redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) return redis.status_reply("OK") `) _, err = script.Run(r.client, []string{inProgressQ, deadQ}, - string(bytes), errMsg, now.Unix(), limit, maxDeadTask).Result() + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result() return err }