From c06734408fe74113ec6d753f130aeae1dd6b1727 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 15 Aug 2020 06:38:33 -0700 Subject: [PATCH] Update all kill methods in RDB --- internal/rdb/inspect.go | 44 ++- internal/rdb/inspect_test.go | 562 ++++++++++++++++++++++++----------- 2 files changed, 414 insertions(+), 192 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 5b0df3d..86f1b2a 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -420,12 +420,10 @@ func (r *RDB) removeAndEnqueueAll(zset, qkey string) (int64, error) { return n, nil } -/* -// KillRetryTask finds a task that matches the given id and score from retry queue -// and moves it to dead queue. If a task that maches the id and score does not exist, -// it returns ErrTaskNotFound. -func (r *RDB) KillRetryTask(id uuid.UUID, score int64) error { - n, err := r.removeAndKill(base.RetryQueue, id.String(), float64(score)) +// KillRetryTask finds a retry task that matches the given id and score from the given queue +// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndKill(base.RetryKey(qname), base.DeadKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -435,11 +433,10 @@ func (r *RDB) KillRetryTask(id uuid.UUID, score int64) error { return nil } -// KillScheduledTask finds a task that matches the given id and score from scheduled queue -// and moves it to dead queue. If a task that maches the id and score does not exist, -// it returns ErrTaskNotFound. -func (r *RDB) KillScheduledTask(id uuid.UUID, score int64) error { - n, err := r.removeAndKill(base.ScheduledQueue, id.String(), float64(score)) +// KillScheduledTask finds a scheduled task that matches the given id and score from the given queue +// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndKill(base.ScheduledKey(qname), base.DeadKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -449,20 +446,20 @@ func (r *RDB) KillScheduledTask(id uuid.UUID, score int64) error { return nil } -// KillAllRetryTasks moves all tasks from retry queue to dead queue and +// KillAllRetryTasks kills all retry tasks from the given queue and // returns the number of tasks that were moved. -func (r *RDB) KillAllRetryTasks() (int64, error) { - return r.removeAndKillAll(base.RetryQueue) +func (r *RDB) KillAllRetryTasks(qname string) (int64, error) { + return r.removeAndKillAll(base.RetryKey(qname), base.DeadKey(qname)) } -// KillAllScheduledTasks moves all tasks from scheduled queue to dead queue and +// KillAllScheduledTasks kills all scheduled tasks from the given queue and // returns the number of tasks that were moved. -func (r *RDB) KillAllScheduledTasks() (int64, error) { - return r.removeAndKillAll(base.ScheduledQueue) +func (r *RDB) KillAllScheduledTasks(qname string) (int64, error) { + return r.removeAndKillAll(base.ScheduledKey(qname), base.DeadKey(qname)) } // KEYS[1] -> ZSET to move task from (e.g., retry queue) -// KEYS[2] -> asynq:dead +// KEYS[2] -> asynq:{}:dead // ARGV[1] -> score of the task to kill // ARGV[2] -> id of the task to kill // ARGV[3] -> current timestamp @@ -482,11 +479,11 @@ for _, msg in ipairs(msgs) do end return 0`) -func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { +func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) { now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago res, err := removeAndKillCmd.Run(r.client, - []string{zset, base.DeadQueue}, + []string{src, dst}, score, id, now.Unix(), limit, maxDeadTasks).Result() if err != nil { return 0, err @@ -499,7 +496,7 @@ func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { } // KEYS[1] -> ZSET to move task from (e.g., retry queue) -// KEYS[2] -> asynq:dead +// KEYS[2] -> asynq:{}:dead // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> max number of tasks in dead queue (e.g., 100) @@ -513,10 +510,10 @@ for _, msg in ipairs(msgs) do end return table.getn(msgs)`) -func (r *RDB) removeAndKillAll(zset string) (int64, error) { +func (r *RDB) removeAndKillAll(src, dst string) (int64, error) { now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - res, err := removeAndKillAllCmd.Run(r.client, []string{zset, base.DeadQueue}, + res, err := removeAndKillAllCmd.Run(r.client, []string{src, dst}, now.Unix(), limit, maxDeadTasks).Result() if err != nil { return 0, err @@ -528,6 +525,7 @@ func (r *RDB) removeAndKillAll(zset string) (int64, error) { return n, nil } +/* // DeleteDeadTask finds a task that matches the given id and score from dead queue // and deletes it. If a task that matches the id and score does not exist, // it returns ErrTaskNotFound. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index d7cbd2c..a429656 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1599,325 +1599,549 @@ func TestEnqueueAllDeadTasks(t *testing.T) { } } -/* func TestKillRetryTask(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - t1 := time.Now().Add(time.Minute) - t2 := time.Now().Add(time.Hour) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + t1 := time.Now().Add(1 * time.Minute) + t2 := time.Now().Add(1 * time.Hour) + t3 := time.Now().Add(2 * time.Hour) + t4 := time.Now().Add(3 * time.Hour) tests := []struct { - retry []base.Z - dead []base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string id uuid.UUID score int64 want error - wantRetry []base.Z - wantDead []base.Z + wantRetry map[string][]base.Z + wantDead map[string][]base.Z }{ { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, - dead: []base.Z{}, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", id: m1.ID, score: t1.Unix(), want: nil, - wantRetry: []base.Z{ - {Message: m2, Score: t2.Unix()}, + wantRetry: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, + wantDead: map[string][]base.Z{ + "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, + retry: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, + qname: "default", id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, - wantRetry: []base.Z{ - {Message: m1, Score: t1.Unix()}, + wantRetry: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - wantDead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + wantDead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, + { + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + {Message: m4, Score: t4.Unix()}, + }, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + id: m3.ID, + score: t3.Unix(), + want: nil, + wantRetry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m4, Score: t4.Unix()}, + }, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: time.Now().Unix()}}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedRetryQueue(t, r.client, tc.retry) - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllRetryQueues(t, r.client, tc.retry) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got := r.KillRetryTask(tc.id, tc.score) + got := r.KillRetryTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("(*RDB).KillRetryTask(%v, %v) = %v, want %v", - tc.id, tc.score, got, tc.want) + t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v", + tc.qname, tc.id, tc.score, got, tc.want) continue } - gotRetry := h.GetRetryEntries(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.RetryKey(qname), diff) + } } - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadKey(qname), diff) + } } } } func TestKillScheduledTask(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - t1 := time.Now().Add(time.Minute) - t2 := time.Now().Add(time.Hour) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + t1 := time.Now().Add(1 * time.Minute) + t2 := time.Now().Add(1 * time.Hour) + t3 := time.Now().Add(2 * time.Hour) + t4 := time.Now().Add(3 * time.Hour) tests := []struct { - scheduled []base.Z - dead []base.Z + scheduled map[string][]base.Z + dead map[string][]base.Z + qname string id uuid.UUID score int64 want error - wantScheduled []base.Z - wantDead []base.Z + wantScheduled map[string][]base.Z + wantDead map[string][]base.Z }{ { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, - dead: []base.Z{}, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", id: m1.ID, score: t1.Unix(), want: nil, - wantScheduled: []base.Z{ - {Message: m2, Score: t2.Unix()}, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, + wantDead: map[string][]base.Z{ + "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, + qname: "default", id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, - wantScheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - wantDead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + wantDead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, + { + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + {Message: m4, Score: t4.Unix()}, + }, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + id: m3.ID, + score: t3.Unix(), + want: nil, + wantScheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m4, Score: t4.Unix()}, + }, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: time.Now().Unix()}}, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedScheduledQueue(t, r.client, tc.scheduled) - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got := r.KillScheduledTask(tc.id, tc.score) + got := r.KillScheduledTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("(*RDB).KillScheduledTask(%v, %v) = %v, want %v", - tc.id, tc.score, got, tc.want) + t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v", + tc.qname, tc.id, tc.score, got, tc.want) continue } - gotScheduled := h.GetScheduledEntries(t, r.client) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ScheduledKey(qname), diff) + } } - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadKey(qname), diff) + } } } } func TestKillAllRetryTasks(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - t1 := time.Now().Add(time.Minute) - t2 := time.Now().Add(time.Hour) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + t1 := time.Now().Add(1 * time.Minute) + t2 := time.Now().Add(1 * time.Hour) + t3 := time.Now().Add(2 * time.Hour) + t4 := time.Now().Add(3 * time.Hour) tests := []struct { - retry []base.Z - dead []base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string want int64 - wantRetry []base.Z - wantDead []base.Z + wantRetry map[string][]base.Z + wantDead map[string][]base.Z }{ { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, - dead: []base.Z{}, - want: 2, - wantRetry: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 2, + wantRetry: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + }, }, }, { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, + retry: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, - want: 1, - wantRetry: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: t2.Unix()}, + qname: "default", + want: 1, + wantRetry: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, }, { - retry: []base.Z{}, - dead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + retry: map[string][]base.Z{ + "default": {}, }, - want: 0, - wantRetry: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + want: 0, + wantRetry: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + }, + { + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + {Message: m4, Score: t4.Unix()}, + }, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + want: 2, + wantRetry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": {}, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: m3, Score: time.Now().Unix()}, + {Message: m4, Score: time.Now().Unix()}, + }, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedRetryQueue(t, r.client, tc.retry) - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllRetryQueues(t, r.client, tc.retry) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got, err := r.KillAllRetryTasks() + got, err := r.KillAllRetryTasks(tc.qname) if got != tc.want || err != nil { - t.Errorf("(*RDB).KillAllRetryTasks() = %v, %v; want %v, nil", - got, err, tc.want) + t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil", + tc.qname, got, err, tc.want) continue } - gotRetry := h.GetRetryEntries(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.RetryKey(qname), diff) + } } - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadKey(qname), diff) + } } } } func TestKillAllScheduledTasks(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") t1 := time.Now().Add(time.Minute) t2 := time.Now().Add(time.Hour) + t3 := time.Now().Add(time.Hour) + t4 := time.Now().Add(time.Hour) tests := []struct { - scheduled []base.Z - dead []base.Z + scheduled map[string][]base.Z + dead map[string][]base.Z + qname string want int64 - wantScheduled []base.Z - wantDead []base.Z + wantScheduled map[string][]base.Z + wantDead map[string][]base.Z }{ { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, - dead: []base.Z{}, - want: 2, - wantScheduled: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 2, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + }, }, }, { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: []base.Z{ - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, }, - want: 1, - wantScheduled: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: t2.Unix()}, + qname: "default", + want: 1, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: t2.Unix()}, + }, }, }, { - scheduled: []base.Z{}, - dead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + scheduled: map[string][]base.Z{ + "default": {}, }, - want: 0, - wantScheduled: []base.Z{}, - wantDead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + want: 0, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + }, + { + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + {Message: m4, Score: t4.Unix()}, + }, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + want: 2, + wantScheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": {}, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: m3, Score: time.Now().Unix()}, + {Message: m4, Score: time.Now().Unix()}, + }, }, }, } for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedScheduledQueue(t, r.client, tc.scheduled) - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got, err := r.KillAllScheduledTasks() + got, err := r.KillAllScheduledTasks(tc.qname) if got != tc.want || err != nil { - t.Errorf("(*RDB).KillAllScheduledTasks() = %v, %v; want %v, nil", - got, err, tc.want) + t.Errorf("(*RDB).KillAllScheduledTasks(%q) = %v, %v; want %v, nil", + tc.qname, got, err, tc.want) continue } - gotScheduled := h.GetScheduledEntries(t, r.client) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.ScheduledKey(qname), diff) + } } - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want,+got)\n%s", + base.DeadKey(qname), diff) + } } } } +/* func TestDeleteDeadTask(t *testing.T) { r := setup(t) m1 := h.NewTaskMessage("send_email", nil)