diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 86f1b2a..11501ac 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -525,26 +525,22 @@ func (r *RDB) removeAndKillAll(src, dst string) (int64, error) { return n, nil } -/* -// DeleteDeadTask finds a task that matches the given id and score from dead queue -// and deletes it. If a task that matches the id and score does not exist, -// it returns ErrTaskNotFound. -func (r *RDB) DeleteDeadTask(id uuid.UUID, score int64) error { - return r.deleteTask(base.DeadQueue, id.String(), float64(score)) +// DeleteDeadTask deletes a dead task that matches the given id and score from the given queue. +// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error { + return r.deleteTask(base.DeadKey(qname), id.String(), float64(score)) } -// DeleteRetryTask finds a task that matches the given id and score from retry queue -// and deletes it. If a task that matches the id and score does not exist, -// it returns ErrTaskNotFound. -func (r *RDB) DeleteRetryTask(id uuid.UUID, score int64) error { - return r.deleteTask(base.RetryQueue, id.String(), float64(score)) +// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. +// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error { + return r.deleteTask(base.RetryKey(qname), id.String(), float64(score)) } -// DeleteScheduledTask finds a task that matches the given id and score from -// scheduled queue and deletes it. If a task that matches the id and score -//does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteScheduledTask(id uuid.UUID, score int64) error { - return r.deleteTask(base.ScheduledQueue, id.String(), float64(score)) +// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. +// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error { + return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score)) } var deleteTaskCmd = redis.NewScript(` @@ -558,8 +554,8 @@ for _, msg in ipairs(msgs) do end return 0`) -func (r *RDB) deleteTask(zset, id string, score float64) error { - res, err := deleteTaskCmd.Run(r.client, []string{zset}, score, id).Result() +func (r *RDB) deleteTask(key, id string, score float64) error { + res, err := deleteTaskCmd.Run(r.client, []string{key}, score, id).Result() if err != nil { return err } @@ -579,22 +575,22 @@ local n = redis.call("ZCARD", KEYS[1]) redis.call("DEL", KEYS[1]) return n`) -// DeleteAllDeadTasks deletes all tasks from the dead queue +// DeleteAllDeadTasks deletes all dead tasks from the given queue // and returns the number of tasks deleted. -func (r *RDB) DeleteAllDeadTasks() (int64, error) { - return r.deleteAll(base.DeadQueue) +func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error) { + return r.deleteAll(base.DeadKey(qname)) } -// DeleteAllRetryTasks deletes all tasks from the dead queue +// DeleteAllRetryTasks deletes all retry tasks from the given queue // and returns the number of tasks deleted. -func (r *RDB) DeleteAllRetryTasks() (int64, error) { - return r.deleteAll(base.RetryQueue) +func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) { + return r.deleteAll(base.RetryKey(qname)) } -// DeleteAllScheduledTasks deletes all tasks from the dead queue +// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue // and returns the number of tasks deleted. -func (r *RDB) DeleteAllScheduledTasks() (int64, error) { - return r.deleteAll(base.ScheduledQueue) +func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) { + return r.deleteAll(base.ScheduledKey(qname)) } func (r *RDB) deleteAll(key string) (int64, error) { @@ -608,7 +604,6 @@ func (r *RDB) deleteAll(key string) (int64, error) { } return n, nil } -*/ // ErrQueueNotFound indicates specified queue does not exist. type ErrQueueNotFound struct { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index a429656..2be8138 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2141,306 +2141,457 @@ func TestKillAllScheduledTasks(t *testing.T) { } } -/* func TestDeleteDeadTask(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") t1 := time.Now().Add(-5 * time.Minute) t2 := time.Now().Add(-time.Hour) + t3 := time.Now().Add(-time.Hour) tests := []struct { - dead []base.Z + dead map[string][]base.Z + qname string id uuid.UUID score int64 want error - wantDead []*base.TaskMessage + wantDead map[string][]*base.TaskMessage }{ { - dead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + id: m1.ID, + score: t1.Unix(), + want: nil, + wantDead: map[string][]*base.TaskMessage{ + "default": {m2}, }, - id: m1.ID, - score: t1.Unix(), - want: nil, - wantDead: []*base.TaskMessage{m2}, }, { - dead: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + }, + }, + qname: "custom", + id: m3.ID, + score: t3.Unix(), + want: nil, + wantDead: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, }, - id: m1.ID, - score: t2.Unix(), // id and score mismatch - want: ErrTaskNotFound, - wantDead: []*base.TaskMessage{m1, m2}, }, { - dead: []base.Z{}, - id: m1.ID, - score: t1.Unix(), - want: ErrTaskNotFound, - wantDead: []*base.TaskMessage{}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + id: m1.ID, + score: t2.Unix(), // id and score mismatch + want: ErrTaskNotFound, + wantDead: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + }, + }, + { + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + id: m1.ID, + score: t1.Unix(), + want: ErrTaskNotFound, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got := r.DeleteDeadTask(tc.id, tc.score) + got := r.DeleteDeadTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.DeleteDeadTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } - gotDead := h.GetDeadMessages(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff) + } } } } func TestDeleteRetryTask(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") t1 := time.Now().Add(5 * time.Minute) t2 := time.Now().Add(time.Hour) + t3 := time.Now().Add(time.Hour) tests := []struct { - retry []base.Z + retry map[string][]base.Z + qname string id uuid.UUID score int64 want error - wantRetry []*base.TaskMessage + wantRetry map[string][]*base.TaskMessage }{ { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + id: m1.ID, + score: t1.Unix(), + want: nil, + wantRetry: map[string][]*base.TaskMessage{ + "default": {m2}, }, - id: m1.ID, - score: t1.Unix(), - want: nil, - wantRetry: []*base.TaskMessage{m2}, }, { - retry: []base.Z{ - {Message: m1, Score: t1.Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + }, + }, + qname: "custom", + id: m3.ID, + score: t3.Unix(), + want: nil, + wantRetry: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + }, + { + retry: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + qname: "default", + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantRetry: map[string][]*base.TaskMessage{ + "default": {m1}, }, - id: m2.ID, - score: t2.Unix(), - want: ErrTaskNotFound, - wantRetry: []*base.TaskMessage{m1}, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedRetryQueue(t, r.client, tc.retry) + h.SeedAllRetryQueues(t, r.client, tc.retry) - got := r.DeleteRetryTask(tc.id, tc.score) + got := r.DeleteRetryTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.DeleteRetryTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteRetryTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } - gotRetry := h.GetRetryMessages(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff) + } } } } func TestDeleteScheduledTask(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") t1 := time.Now().Add(5 * time.Minute) t2 := time.Now().Add(time.Hour) + t3 := time.Now().Add(time.Hour) tests := []struct { - scheduled []base.Z + scheduled map[string][]base.Z + qname string id uuid.UUID score int64 want error - wantScheduled []*base.TaskMessage + wantScheduled map[string][]*base.TaskMessage }{ { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + }, + qname: "default", + id: m1.ID, + score: t1.Unix(), + want: nil, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {m2}, }, - id: m1.ID, - score: t1.Unix(), - want: nil, - wantScheduled: []*base.TaskMessage{m2}, }, { - scheduled: []base.Z{ - {Message: m1, Score: t1.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: t1.Unix()}, + {Message: m2, Score: t2.Unix()}, + }, + "custom": { + {Message: m3, Score: t3.Unix()}, + }, + }, + qname: "custom", + id: m3.ID, + score: t3.Unix(), + want: nil, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + }, + { + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + qname: "default", + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {m1}, }, - id: m2.ID, - score: t2.Unix(), - want: ErrTaskNotFound, - wantScheduled: []*base.TaskMessage{m1}, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedScheduledQueue(t, r.client, tc.scheduled) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.DeleteScheduledTask(tc.id, tc.score) + got := r.DeleteScheduledTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.DeleteScheduledTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.DeleteScheduledTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } - gotScheduled := h.GetScheduledMessages(t, r.client) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } } } } func TestDeleteAllDeadTasks(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") tests := []struct { - dead []base.Z + dead map[string][]base.Z + qname string want int64 - wantDead []*base.TaskMessage + wantDead map[string][]*base.TaskMessage }{ { - dead: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, - {Message: m3, Score: time.Now().Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + }, + "custom": { + {Message: m3, Score: time.Now().Unix()}, + }, + }, + qname: "default", + want: 2, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m3}, }, - want: 3, - wantDead: []*base.TaskMessage{}, }, { - dead: []base.Z{}, - want: 0, - wantDead: []*base.TaskMessage{}, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllDeadQueues(t, r.client, tc.dead) - got, err := r.DeleteAllDeadTasks() + got, err := r.DeleteAllDeadTasks(tc.qname) if err != nil { - t.Errorf("r.DeleteAllDeadTasks returned error: %v", err) + t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { - t.Errorf("r.DeleteAllDeadTasks() = %d, nil, want %d, nil", got, tc.want) + t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } - gotDead := h.GetDeadMessages(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff) + } } } } func TestDeleteAllRetryTasks(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") tests := []struct { - retry []base.Z + retry map[string][]base.Z + qname string want int64 - wantRetry []*base.TaskMessage + wantRetry map[string][]*base.TaskMessage }{ { - retry: []base.Z{ - {Message: m1, Score: time.Now().Unix()}, - {Message: m2, Score: time.Now().Unix()}, - {Message: m3, Score: time.Now().Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Unix()}, + {Message: m2, Score: time.Now().Unix()}, + }, + "custom": { + {Message: m3, Score: time.Now().Unix()}, + }, + }, + qname: "custom", + want: 1, + wantRetry: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, }, - want: 3, - wantRetry: []*base.TaskMessage{}, }, { - retry: []base.Z{}, - want: 0, - wantRetry: []*base.TaskMessage{}, + retry: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedRetryQueue(t, r.client, tc.retry) + h.SeedAllRetryQueues(t, r.client, tc.retry) - got, err := r.DeleteAllRetryTasks() + got, err := r.DeleteAllRetryTasks(tc.qname) if err != nil { - t.Errorf("r.DeleteAllRetryTasks returned error: %v", err) + t.Errorf("r.DeleteAllRetryTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { - t.Errorf("r.DeleteAllRetryTasks() = %d, nil, want %d, nil", got, tc.want) + t.Errorf("r.DeleteAllRetryTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } - gotRetry := h.GetRetryMessages(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff) + } } } } func TestDeleteAllScheduledTasks(t *testing.T) { r := setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") tests := []struct { - scheduled []base.Z + scheduled map[string][]base.Z + qname string want int64 - wantScheduled []*base.TaskMessage + wantScheduled map[string][]*base.TaskMessage }{ { - scheduled: []base.Z{ - {Message: m1, Score: time.Now().Add(time.Minute).Unix()}, - {Message: m2, Score: time.Now().Add(time.Minute).Unix()}, - {Message: m3, Score: time.Now().Add(time.Minute).Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: time.Now().Add(time.Minute).Unix()}, + {Message: m2, Score: time.Now().Add(time.Minute).Unix()}, + }, + "custom": { + {Message: m3, Score: time.Now().Add(time.Minute).Unix()}, + }, + }, + qname: "default", + want: 2, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m3}, }, - want: 3, - wantScheduled: []*base.TaskMessage{}, }, { - scheduled: []base.Z{}, - want: 0, - wantScheduled: []*base.TaskMessage{}, + scheduled: map[string][]base.Z{ + "custom": {}, + }, + qname: "custom", + want: 0, + wantScheduled: map[string][]*base.TaskMessage{ + "custom": {}, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedScheduledQueue(t, r.client, tc.scheduled) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got, err := r.DeleteAllScheduledTasks() + got, err := r.DeleteAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("r.DeleteAllScheduledTasks returned error: %v", err) + t.Errorf("r.DeleteAllScheduledTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { - t.Errorf("r.DeleteAllScheduledTasks() = %d, nil, want %d, nil", got, tc.want) + t.Errorf("r.DeleteAllScheduledTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } - gotScheduled := h.GetScheduledMessages(t, r.client) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } } } } +/* func TestRemoveQueue(t *testing.T) { r := setup(t) m1 := h.NewTaskMessage("send_email", nil)