diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go index 1d5be35..12ec855 100644 --- a/internal/rdb/helpers_test.go +++ b/internal/rdb/helpers_test.go @@ -49,7 +49,7 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*base.TaskMessage) []*base var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].msg.ID.String() < out[j].msg.ID.String() + return out[i].Msg.ID.String() < out[j].Msg.ID.String() }) return out }) @@ -112,7 +112,7 @@ func seedRedisList(t *testing.T, c *redis.Client, key string, msgs []*base.TaskM func seedRedisZSet(t *testing.T, c *redis.Client, key string, items []sortedSetEntry) { for _, item := range items { - z := &redis.Z{Member: mustMarshal(t, item.msg), Score: float64(item.score)} + z := &redis.Z{Member: mustMarshal(t, item.Msg), Score: float64(item.Score)} if err := c.ZAdd(key, z).Err(); err != nil { t.Fatal(err) } @@ -121,8 +121,8 @@ func seedRedisZSet(t *testing.T, c *redis.Client, key string, items []sortedSetE // scheduledEntry represents an item in redis sorted set (aka ZSET). type sortedSetEntry struct { - msg *base.TaskMessage - score int64 + Msg *base.TaskMessage + Score int64 } // seedDefaultQueue initializes the default queue with the given messages. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index afd2ceb..dbb78da 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -25,7 +25,6 @@ type Stats struct { } // EnqueuedTask is a task in a queue and is ready to be processed. -// Note: This is read only and used for monitoring purpose. type EnqueuedTask struct { ID xid.ID Type string @@ -33,7 +32,6 @@ type EnqueuedTask struct { } // InProgressTask is a task that's currently being processed. -// Note: This is read only and used for monitoring purpose. type InProgressTask struct { ID xid.ID Type string @@ -41,7 +39,6 @@ type InProgressTask struct { } // ScheduledTask is a task that's scheduled to be processed in the future. -// Note: This is read only and used for monitoring purpose. type ScheduledTask struct { ID xid.ID Type string @@ -51,7 +48,6 @@ type ScheduledTask struct { } // RetryTask is a task that's in retry queue because worker failed to process the task. -// Note: This is read only and used for monitoring purpose. type RetryTask struct { ID xid.ID Type string @@ -65,7 +61,6 @@ type RetryTask struct { } // DeadTask is a task in that has exhausted all retries. -// Note: This is read only and used for monitoring purpose. type DeadTask struct { ID xid.ID Type string @@ -391,6 +386,71 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { return n, nil } +// KillRetryTask finds a task that matches the given id and score from retry queue +// and moves it to dead queue. If a task that maches the id and score does not exist, +// it returns ErrTaskNotFound. +func (r *RDB) KillRetryTask(id xid.ID, score int64) error { + n, err := r.removeAndKill(base.RetryQueue, id.String(), float64(score)) + if err != nil { + return err + } + if n == 0 { + return ErrTaskNotFound + } + return nil +} + +// KillScheduledTask finds a task that matches the given id and score from scheduled queue +// and moves it to dead queue. If a task that maches the id and score does not exist, +// it returns ErrTaskNotFound. +func (r *RDB) KillScheduledTask(id xid.ID, score int64) error { + n, err := r.removeAndKill(base.ScheduledQueue, id.String(), float64(score)) + if err != nil { + return err + } + if n == 0 { + return ErrTaskNotFound + } + return nil +} + +func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { + // KEYS[1] -> ZSET to move task from (e.g., retry queue) + // KEYS[2] -> asynq:dead + // ARGV[1] -> score of the task to kill + // ARGV[2] -> id of the task to kill + // ARGV[3] -> current timestamp + // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) + // ARGV[5] -> max number of tasks in dead queue (e.g., 100) + script := redis.NewScript(` + local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) + for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if decoded["ID"] == ARGV[2] then + redis.call("ZREM", KEYS[1], msg) + redis.call("ZADD", KEYS[2], ARGV[3], msg) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) + return 1 + end + end + return 0 + `) + now := time.Now() + limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + res, err := script.Run(r.client, + []string{zset, base.DeadQueue}, + score, id, now.Unix(), limit, maxDeadTasks).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil +} + // DeleteDeadTask finds a task that matches the given id and score from dead queue // and deletes it. If a task that matches the id and score does not exist, // it returns ErrTaskNotFound. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 9b8b888..19bdbc4 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -832,6 +832,188 @@ func TestEnqueueAllDeadTasks(t *testing.T) { } } +func TestKillRetryTask(t *testing.T) { + r := setup(t) + m1 := newTaskMessage("send_email", nil) + m2 := newTaskMessage("reindex", nil) + t1 := time.Now().Add(time.Minute) + t2 := time.Now().Add(time.Hour) + + tests := []struct { + retry []sortedSetEntry + dead []sortedSetEntry + id xid.ID + score int64 + want error + wantRetry []sortedSetEntry + wantDead []sortedSetEntry + }{ + { + retry: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + dead: []sortedSetEntry{}, + id: m1.ID, + score: t1.Unix(), + want: nil, + wantRetry: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + }, + }, + { + retry: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + dead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantRetry: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + wantDead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedRetryQueue(t, r, tc.retry) + seedDeadQueue(t, r, tc.dead) + + got := r.KillRetryTask(tc.id, tc.score) + if got != tc.want { + t.Errorf("(*RDB).KillRetryTask(%v, %v) = %v, want %v", + tc.id, tc.score, got, tc.want) + continue + } + + gotRetryRaw := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Val() + var gotRetry []sortedSetEntry + for _, z := range gotRetryRaw { + gotRetry = append(gotRetry, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.RetryQueue, diff) + } + + gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() + var gotDead []sortedSetEntry + for _, z := range gotDeadRaw { + gotDead = append(gotDead, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadQueue, diff) + } + } +} + +func TestKillScheduledTask(t *testing.T) { + r := setup(t) + m1 := newTaskMessage("send_email", nil) + m2 := newTaskMessage("reindex", nil) + t1 := time.Now().Add(time.Minute) + t2 := time.Now().Add(time.Hour) + + tests := []struct { + scheduled []sortedSetEntry + dead []sortedSetEntry + id xid.ID + score int64 + want error + wantScheduled []sortedSetEntry + wantDead []sortedSetEntry + }{ + { + scheduled: []sortedSetEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + dead: []sortedSetEntry{}, + id: m1.ID, + score: t1.Unix(), + want: nil, + wantScheduled: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + wantDead: []sortedSetEntry{ + {m1, time.Now().Unix()}, + }, + }, + { + scheduled: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + dead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantScheduled: []sortedSetEntry{ + {m1, t1.Unix()}, + }, + wantDead: []sortedSetEntry{ + {m2, t2.Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedScheduledQueue(t, r, tc.scheduled) + seedDeadQueue(t, r, tc.dead) + + got := r.KillScheduledTask(tc.id, tc.score) + if got != tc.want { + t.Errorf("(*RDB).KillScheduledTask(%v, %v) = %v, want %v", + tc.id, tc.score, got, tc.want) + continue + } + + gotScheduledRaw := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val() + var gotScheduled []sortedSetEntry + for _, z := range gotScheduledRaw { + gotScheduled = append(gotScheduled, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ScheduledQueue, diff) + } + + gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() + var gotDead []sortedSetEntry + for _, z := range gotDeadRaw { + gotDead = append(gotDead, sortedSetEntry{ + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), + }) + } + if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadQueue, diff) + } + } +} + func TestDeleteDeadTask(t *testing.T) { r := setup(t) m1 := newTaskMessage("send_email", nil) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5069721..b466cbe 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -179,12 +179,15 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e return err } +const ( + maxDeadTasks = 10000 + deadExpirationInDays = 90 +) + // Kill sends the task to "dead" queue from in-progress queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { - const maxDeadTask = 100 - const deadExpirationInDays = 90 bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -227,7 +230,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { `) _, err = script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey}, - string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask, expireAt.Unix()).Result() + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 586942a..d072a0a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -246,13 +246,12 @@ func TestKill(t *testing.T) { data := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() for _, z := range data { gotDead = append(gotDead, sortedSetEntry{ - msg: mustUnmarshal(t, z.Member.(string)), - score: int64(z.Score), + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), }) } - cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) - if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) } @@ -512,8 +511,8 @@ func TestRetry(t *testing.T) { var gotRetry []sortedSetEntry for _, z := range gotRetryRaw { gotRetry = append(gotRetry, sortedSetEntry{ - msg: mustUnmarshal(t, z.Member.(string)), - score: int64(z.Score), + Msg: mustUnmarshal(t, z.Member.(string)), + Score: int64(z.Score), }) } cmpOpt := cmp.AllowUnexported(sortedSetEntry{})