diff --git a/inspector.go b/inspector.go index adfaac5..888fa74 100644 --- a/inspector.go +++ b/inspector.go @@ -29,70 +29,62 @@ func NewInspector(r RedisConnOpt) *Inspector { // Stats represents a state of queues at a certain time. type Stats struct { - Enqueued int + // Name of the queue. + Queue string + // Number of enqueued tasks. + Enqueued int + // Number of in-progress tasks. InProgress int - Scheduled int - Retry int - Dead int - Processed int - Failed int - Queues []*QueueInfo - Timestamp time.Time -} - -// QueueInfo holds information about a queue. -type QueueInfo struct { - // Name of the queue (e.g. "default", "critical"). - // Note: It doesn't include the prefix "asynq:queues:". - Name string - + // Number of scheduled tasks. + Scheduled int + // Number of retry tasks. + Retry int + // Number of dead tasks. + Dead int + // Total number of tasks being processed during the given date. + // The number includes both succeeded and failed tasks. + Processed int + // Total number of tasks failed to be processed during the given date. + Failed int // Paused indicates whether the queue is paused. - // If true, tasks in the queue should not be processed. + // If true, tasks in the queue will not be processed. Paused bool - - // Size is the number of tasks in the queue. - Size int + // Time when this stats was taken. + Timestamp time.Time } -// CurrentStats returns a current stats of the queues. -func (i *Inspector) CurrentStats() (*Stats, error) { - stats, err := i.rdb.CurrentStats() +// CurrentStats returns a current stats of the given queue. +func (i *Inspector) CurrentStats(qname string) (*Stats, error) { + stats, err := i.rdb.CurrentStats(qname) if err != nil { return nil, err } - var qs []*QueueInfo - for _, q := range stats.Queues { - qs = append(qs, (*QueueInfo)(q)) - } - return &Stats{ - Enqueued: stats.Enqueued, - InProgress: stats.InProgress, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Dead: stats.Dead, - Processed: stats.Processed, - Failed: stats.Failed, - Queues: qs, - Timestamp: stats.Timestamp, - }, nil + return &Stats(stats), nil } -// DailyStats holds aggregate data for a given day. +// DailyStats holds aggregate data for a given day for a given queue. type DailyStats struct { + // Name of the queue. + Queue string + // Total number of tasks being processed during the given date. + // The number includes both succeeded and failed tasks. Processed int - Failed int - Date time.Time + // Total number of tasks failed to be processed during the given date. + Failed int + // Date this stats was taken. + Date time.Time } // History returns a list of stats from the last n days. -func (i *Inspector) History(n int) ([]*DailyStats, error) { - stats, err := i.rdb.HistoricalStats(n) +func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { + stats, err := i.rdb.HistoricalStats(qname, n) if err != nil { return nil, err } var res []*DailyStats for _, s := range stats { res = append(res, &DailyStats{ + Queue: s.Queue, Processed: s.Processed, Failed: s.Failed, Date: s.Time, @@ -111,7 +103,8 @@ type EnqueuedTask struct { // InProgressTask is a task that's currently being processed. type InProgressTask struct { *Task - ID string + ID string + Queue string } // ScheduledTask is a task scheduled to be processed in the future. @@ -169,7 +162,7 @@ func (t *DeadTask) Key() string { // parseTaskKey parses a key string and returns each part of key with proper // type if valid, otherwise it reports an error. -func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) { +func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) { parts := strings.Split(key, ":") if len(parts) != 3 { return uuid.Nil, 0, "", fmt.Errorf("invalid id") @@ -182,11 +175,11 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err erro if err != nil { return uuid.Nil, 0, "", fmt.Errorf("invalid id") } - qtype = parts[0] - if len(qtype) != 1 || !strings.Contains("srd", qtype) { + state = parts[0] + if len(state) != 1 || !strings.Contains("srd", state) { return uuid.Nil, 0, "", fmt.Errorf("invalid id") } - return id, score, qtype, nil + return id, score, state, nil } // ListOption specifies behavior of list operation. @@ -250,7 +243,7 @@ func Page(n int) ListOption { return pageNumOpt(n) } -// ListScheduledTasks retrieves tasks in the specified queue. +// ListEnqueuedTasks retrieves enqueued tasks from the specified queue. // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) { @@ -271,34 +264,35 @@ func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*Enqu return tasks, err } -// ListScheduledTasks retrieves tasks currently being processed. +// ListInProgressTasks retrieves in-progress tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) { +func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) { opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListInProgress(pgn) + msgs, err := i.rdb.ListInProgress(qname, pgn) if err != nil { return nil, err } var tasks []*InProgressTask for _, m := range msgs { tasks = append(tasks, &InProgressTask{ - Task: NewTask(m.Type, m.Payload), - ID: m.ID.String(), + Task: NewTask(m.Type, m.Payload), + ID: m.ID.String(), + Queue: m.Queue, }) } return tasks, err } -// ListScheduledTasks retrieves tasks in scheduled state. +// ListScheduledTasks retrieves scheduled tasks from the specified queue. // Tasks are sorted by NextEnqueueAt field in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) { +func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListScheduled(pgn) + zs, err := i.rdb.ListScheduled(qname, pgn) if err != nil { return nil, err } @@ -317,14 +311,14 @@ func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, er return tasks, nil } -// ListScheduledTasks retrieves tasks in retry state. +// ListRetryTasks retrieves retry tasks from the specified queue. // Tasks are sorted by NextEnqueueAt field in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) { +func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListRetry(pgn) + zs, err := i.rdb.ListRetry(qname, pgn) if err != nil { return nil, err } @@ -347,14 +341,14 @@ func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) { return tasks, nil } -// ListScheduledTasks retrieves tasks in retry state. +// ListDeadTasks retrieves dead tasks from the specified queue. // Tasks are sorted by LastFailedAt field in descending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) { +func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) { opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListDead(pgn) + zs, err := i.rdb.ListDead(qname, pgn) if err != nil { return nil, err } @@ -377,109 +371,110 @@ func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) { return nil, nil } -// DeleteAllScheduledTasks deletes all tasks in scheduled state, +// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllScheduledTasks() (int, error) { - n, err := i.rdb.DeleteAllScheduledTasks() +func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { + n, err := i.rdb.DeleteAllScheduledTasks(qname) return int(n), err } -// DeleteAllRetryTasks deletes all tasks in retry state, +// DeleteAllRetryTasks deletes all retry tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllRetryTasks() (int, error) { - n, err := i.rdb.DeleteAllRetryTasks() +func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { + n, err := i.rdb.DeleteAllRetryTasks(qname) return int(n), err } -// DeleteAllDeadTasks deletes all tasks in dead state, +// DeleteAllDeadTasks deletes all dead tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllDeadTasks() (int, error) { - n, err := i.rdb.DeleteAllDeadTasks() +func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) { + n, err := i.rdb.DeleteAllDeadTasks(qname) return int(n), err } -// DeleteTaskByKey deletes a task with the given key. -func (i *Inspector) DeleteTaskByKey(key string) error { - id, score, qtype, err := parseTaskKey(key) +// DeleteTaskByKey deletes a task with the given key from the given queue. +func (i *Inspector) DeleteTaskByKey(qname, key string) error { + id, score, state, err := parseTaskKey(key) if err != nil { return err } - switch qtype { + switch state { case "s": - return i.rdb.DeleteScheduledTask(id, score) + return i.rdb.DeleteScheduledTask(qname, id, score) case "r": - return i.rdb.DeleteRetryTask(id, score) + return i.rdb.DeleteRetryTask(qname, id, score) case "d": - return i.rdb.DeleteDeadTask(id, score) + return i.rdb.DeleteDeadTask(qname, id, score) default: return fmt.Errorf("invalid key") } } -// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state, +// TODO(hibiken): Use different verb here. Idea: Run or Stage +// EnqueueAllScheduledTasks enqueues all scheduled tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllScheduledTasks() (int, error) { - n, err := i.rdb.EnqueueAllScheduledTasks() +func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) { + n, err := i.rdb.EnqueueAllScheduledTasks(qname) return int(n), err } -// EnqueueAllRetryTasks enqueues all tasks in the retry state, +// EnqueueAllRetryTasks enqueues all retry tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllRetryTasks() (int, error) { - n, err := i.rdb.EnqueueAllRetryTasks() +func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) { + n, err := i.rdb.EnqueueAllRetryTasks(qname) return int(n), err } -// EnqueueAllDeadTasks enqueues all tasks in the dead state, +// EnqueueAllDeadTasks enqueues all dead tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllDeadTasks() (int, error) { - n, err := i.rdb.EnqueueAllDeadTasks() +func (i *Inspector) EnqueueAllDeadTasks(qname string) (int, error) { + n, err := i.rdb.EnqueueAllDeadTasks(qname) return int(n), err } -// EnqueueTaskByKey enqueues a task with the given key. -func (i *Inspector) EnqueueTaskByKey(key string) error { - id, score, qtype, err := parseTaskKey(key) +// EnqueueTaskByKey enqueues a task with the given key in the given queue. +func (i *Inspector) EnqueueTaskByKey(qname, key string) error { + id, score, state, err := parseTaskKey(key) if err != nil { return err } - switch qtype { + switch state { case "s": - return i.rdb.EnqueueScheduledTask(id, score) + return i.rdb.EnqueueScheduledTask(qname, id, score) case "r": - return i.rdb.EnqueueRetryTask(id, score) + return i.rdb.EnqueueRetryTask(qname, id, score) case "d": - return i.rdb.EnqueueDeadTask(id, score) + return i.rdb.EnqueueDeadTask(qname, id, score) default: return fmt.Errorf("invalid key") } } -// KillAllScheduledTasks kills all tasks in scheduled state, +// KillAllScheduledTasks kills all scheduled tasks within the given queue, // and reports the number of tasks killed. -func (i *Inspector) KillAllScheduledTasks() (int, error) { - n, err := i.rdb.KillAllScheduledTasks() +func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) { + n, err := i.rdb.KillAllScheduledTasks(qname) return int(n), err } -// KillAllRetryTasks kills all tasks in retry state, +// KillAllRetryTasks kills all retry tasks within the given queue, // and reports the number of tasks killed. -func (i *Inspector) KillAllRetryTasks() (int, error) { - n, err := i.rdb.KillAllRetryTasks() +func (i *Inspector) KillAllRetryTasks(qname string) (int, error) { + n, err := i.rdb.KillAllRetryTasks(qname) return int(n), err } -// KillTaskByKey kills a task with the given key. -func (i *Inspector) KillTaskByKey(key string) error { - id, score, qtype, err := parseTaskKey(key) +// KillTaskByKey kills a task with the given key in the given queue. +func (i *Inspector) KillTaskByKey(qname, key string) error { + id, score, state, err := parseTaskKey(key) if err != nil { return err } - switch qtype { + switch state { case "s": - return i.rdb.KillScheduledTask(id, score) + return i.rdb.KillScheduledTask(qname, id, score) case "r": - return i.rdb.KillRetryTask(id, score) + return i.rdb.KillRetryTask(qname, id, score) case "d": return fmt.Errorf("task already dead") default: diff --git a/inspector_test.go b/inspector_test.go index fa15d40..bdb504c 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -35,45 +35,66 @@ func TestInspectorCurrentStats(t *testing.T) { tests := []struct { enqueued map[string][]*base.TaskMessage - inProgress []*base.TaskMessage - scheduled []base.Z - retry []base.Z - dead []base.Z - processed int - failed int - allQueues []interface{} + inProgress map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + processed map[string]int + failed map[string]int + qname string want *Stats }{ { enqueued: map[string][]*base.TaskMessage{ - base.DefaultQueueName: {m1}, - "critical": {m5}, - "low": {m6}, + "default": {m1}, + "critical": {m5}, + "low": {m6}, }, - inProgress: []*base.TaskMessage{m2}, - scheduled: []base.Z{ - {Message: m3, Score: now.Add(time.Hour).Unix()}, - {Message: m4, Score: now.Unix()}}, - retry: []base.Z{}, - dead: []base.Z{}, - processed: 120, - failed: 2, - allQueues: []interface{}{base.DefaultQueue, base.QueueKey("critical"), base.QueueKey("low")}, + inProgress: map[string][]*base.TaskMessage{ + "default": {m2}, + "critical": {}, + "low": {}, + }, + scheduled: map[string][]base.Z{ + "default": { + {Message: m3, Score: now.Add(time.Hour).Unix()}, + {Message: m4, Score: now.Unix()}, + }, + "critical": {}, + "low": {}, + }, + retry: []base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + dead: []base.Z{ + "default": {}, + "critical": {}, + "low": {}, + }, + processed: map[string]int{ + "default": 120, + "critical": 100, + "low": 42, + }, + failed: map[string]int{ + "default": 2, + "critical": 0, + "low": 5, + }, + qname: "default", want: &Stats{ - Enqueued: 3, + Queue: "default", + Enqueued: 1, InProgress: 1, Scheduled: 2, Retry: 0, Dead: 0, Processed: 120, Failed: 2, + Paused: false, Timestamp: now, - // Queues should be sorted by name. - Queues: []*QueueInfo{ - {Name: "critical", Paused: false, Size: 1}, - {Name: "default", Paused: false, Size: 1}, - {Name: "low", Paused: false, Size: 1}, - }, }, }, } @@ -81,25 +102,28 @@ func TestInspectorCurrentStats(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - asynqtest.SeedInProgressQueue(t, r, tc.inProgress) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) - asynqtest.SeedRetryQueue(t, r, tc.retry) - asynqtest.SeedDeadQueue(t, r, tc.dead) - processedKey := base.ProcessedKey(now) - failedKey := base.FailureKey(now) - r.Set(processedKey, tc.processed, 0) - r.Set(failedKey, tc.failed, 0) - r.SAdd(base.AllQueues, tc.allQueues...) + asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) + for qname, n := range tc.processed { + processedKey := base.ProcessedKey(qname, now) + r.client.Set(processedKey, n, 0) + } + for qname, n := range tc.failed { + failedKey := base.FailedKey(qname, now) + r.client.Set(failedKey, n, 0) + } - got, err := inspector.CurrentStats() + got, err := inspector.CurrentStats(tc.qname) if err != nil { - t.Errorf("r.CurrentStats() = %v, %v, want %v, nil", - got, err, tc.want) + t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil", + tc.qname, got, err, tc.want) continue } if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { - t.Errorf("r.CurrentStats() = %v, %v, want %v, nil; (-want, +got)\n%s", - got, err, tc.want, diff) + t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", + tc.qname, got, err, tc.want, diff) continue } } @@ -117,11 +141,12 @@ func TestInspectorHistory(t *testing.T) { }) tests := []struct { - n int // number of days + qname string // queue of interest + n int // number of days }{ - {90}, - {7}, - {0}, + {"default", 90}, + {"custom", 7}, + {"default", 0}, } for _, tc := range tests { @@ -130,24 +155,25 @@ func TestInspectorHistory(t *testing.T) { // populate last n days data for i := 0; i < tc.n; i++ { ts := now.Add(-time.Duration(i) * 24 * time.Hour) - processedKey := base.ProcessedKey(ts) - failedKey := base.FailureKey(ts) + processedKey := base.ProcessedKey(tc.qname, ts) + failedKey := base.FailedKey(tc.qname, ts) r.Set(processedKey, (i+1)*1000, 0) r.Set(failedKey, (i+1)*10, 0) } - got, err := inspector.History(tc.n) + got, err := inspector.History(tc.qname, tc.n) if err != nil { - t.Errorf("Inspector.History(%d) returned error: %v", tc.n, err) + t.Errorf("Inspector.History(%q, %d) returned error: %v", tc.qname, tc.n, err) continue } if len(got) != tc.n { - t.Errorf("Inspector.History(%d) returned %d daily stats, want %d", - tc.n, len(got), tc.n) + t.Errorf("Inspector.History(%q, %d) returned %d daily stats, want %d", + tc.qname, tc.n, len(got), tc.n) continue } for i := 0; i < tc.n; i++ { want := &DailyStats{ + Queue: tc.qname, Processed: (i + 1) * 1000, Failed: (i + 1) * 10, Date: now.Add(-time.Duration(i) * 24 * time.Hour), @@ -243,6 +269,8 @@ func TestInspectorListInProgressTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) + m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessage("task4", nil) inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -251,19 +279,25 @@ func TestInspectorListInProgressTasks(t *testing.T) { createInProgressTask := func(msg *base.TaskMessage) *InProgressTask { return &InProgressTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, } } tests := []struct { desc string - inProgress []*base.TaskMessage + inProgress map[string][]*base.TaskMessage + qname string want []*InProgressTask }{ { - desc: "with a few in-progress tasks", - inProgress: []*base.TaskMessage{m1, m2}, + desc: "with a few in-progress tasks", + inProgress: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3, m4}, + }, + qname: "default", want: []*InProgressTask{ createInProgressTask(m1), createInProgressTask(m2), @@ -273,54 +307,61 @@ func TestInspectorListInProgressTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedInProgressQueue(t, r, tc.inProgress) + asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress) - got, err := inspector.ListInProgressTasks() + got, err := inspector.ListInProgressTasks(tc.qname) if err != nil { - t.Errorf("%s; ListInProgressTasks() returned error: %v", tc.desc, err) + t.Errorf("%s; ListInProgressTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListInProgressTask() = %v, want %v; (-want,+got)\n%s", - tc.desc, got, tc.want, diff) + t.Errorf("%s; ListInProgressTask(%wq) = %v, want %v; (-want,+got)\n%s", + tc.desc, tc.qname, got, tc.want, diff) } } } +func createScheduledTask(z base.Z) *ScheduledTask { + msg := z.Message + return &ScheduledTask{ + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, + NextEnqueueAt: time.Unix(z.Score, 0), + } +} + func TestInspectorListScheduledTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, DB: redisDB, }) - createScheduledTask := func(z base.Z) *ScheduledTask { - msg := z.Message - return &ScheduledTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - NextEnqueueAt: time.Unix(z.Score, 0), - } - } - tests := []struct { desc string - scheduled []base.Z + scheduled map[string][]base.Z + qname string want []*ScheduledTask }{ { - desc: "with a few scheduled tasks", - scheduled: []base.Z{z1, z2, z3}, + desc: "with a few scheduled tasks", + scheduled: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", // Should be sorted by NextEnqueuedAt. want: []*ScheduledTask{ createScheduledTask(z3), @@ -329,65 +370,75 @@ func TestInspectorListScheduledTasks(t *testing.T) { }, }, { - desc: "with empty scheduled queue", - scheduled: []base.Z{}, - want: []*ScheduledTask(nil), + desc: "with empty scheduled queue", + scheduled: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: []*ScheduledTask(nil), }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) got, err := inspector.ListScheduledTasks() if err != nil { - t.Errorf("%s; ListScheduledTasks() returned error: %v", tc.desc, err) + t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListScheduledTask() = %v, want %v; (-want,+got)\n%s", - tc.desc, got, tc.want, diff) + t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", + tc.desc, tc.qname, got, tc.want, diff) } } } +func createRetryTask(z base.Z) *RetryTask { + msg := z.Message + return &RetryTask{ + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, + NextEnqueueAt: time.Unix(z.Score, 0), + MaxRetry: msg.Retry, + Retried: msg.Retried, + ErrorMsg: msg.ErrorMsg, + } +} + func TestInspectorListRetryTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, DB: redisDB, }) - createRetryTask := func(z base.Z) *RetryTask { - msg := z.Message - return &RetryTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - NextEnqueueAt: time.Unix(z.Score, 0), - MaxRetry: msg.Retry, - Retried: msg.Retried, - ErrorMsg: msg.ErrorMsg, - } - } - tests := []struct { desc string - retry []base.Z + retry map[string][]base.Z + qname string want []*RetryTask }{ { - desc: "with a few retry tasks", - retry: []base.Z{z1, z2, z3}, + desc: "with a few retry tasks", + retry: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", // Should be sorted by NextEnqueuedAt. want: []*RetryTask{ createRetryTask(z3), @@ -396,65 +447,76 @@ func TestInspectorListRetryTasks(t *testing.T) { }, }, { - desc: "with empty retry queue", - retry: []base.Z{}, + desc: "with empty retry queue", + retry: map[string][]base.Z{ + "default": {}, + }, + qname: "default", want: []*RetryTask(nil), }, + // TODO(hibiken): ErrQueueNotFound when queue doesn't exist } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) got, err := inspector.ListRetryTasks() if err != nil { - t.Errorf("%s; ListRetryTasks() returned error: %v", tc.desc, err) + t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListRetryTask() = %v, want %v; (-want,+got)\n%s", - tc.desc, got, tc.want, diff) + t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", + tc.desc, tc.qname, got, tc.want, diff) } } } +func createDeadTask(z base.Z) *DeadTask { + msg := z.Message + return &DeadTask{ + Task: NewTask(msg.Type, msg.Payload), + ID: msg.ID.String(), + Queue: msg.Queue, + MaxRetry: msg.Retry, + Retried: msg.Retried, + LastFailedAt: time.Unix(z.Score, 0), + ErrorMsg: msg.ErrorMsg, + } +} + func TestInspectorListDeadTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, DB: redisDB, }) - createDeadTask := func(z base.Z) *DeadTask { - msg := z.Message - return &DeadTask{ - Task: NewTask(msg.Type, msg.Payload), - ID: msg.ID.String(), - Queue: msg.Queue, - MaxRetry: msg.Retry, - Retried: msg.Retried, - LastFailedAt: time.Unix(z.Score, 0), - ErrorMsg: msg.ErrorMsg, - } - } - tests := []struct { desc string - retry []base.Z + dead map[string][]base.Z + qname string want []*DeadTask }{ { - desc: "with a few dead tasks", - retry: []base.Z{z1, z2, z3}, + desc: "with a few dead tasks", + dead: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", // Should be sorted by LastFailedAt. want: []*DeadTask{ createDeadTask(z2), @@ -463,25 +525,28 @@ func TestInspectorListDeadTasks(t *testing.T) { }, }, { - desc: "with empty dead queue", - retry: []base.Z{}, + desc: "with empty dead queue", + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", want: []*DeadTask(nil), }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDeadQueue(t, r, tc.retry) + asynqtest.SeedAllDeadQueues(t, r, tc.retry) - got, err := inspector.ListDeadTasks() + got, err := inspector.ListDeadTasks(tc.qname) if err != nil { - t.Errorf("%s; ListDeadTasks() returned error: %v", tc.desc, err) + t.Errorf("%s; ListDeadTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, DeadTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListDeadTask() = %v, want %v; (-want,+got)\n%s", - tc.desc, got, tc.want, diff) + t.Errorf("%s; ListDeadTask(%q) = %v, want %v; (-want,+got)\n%s", + tc.desc, tc.qname, got, tc.want, diff) } } } @@ -554,10 +619,12 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -565,35 +632,43 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { }) tests := []struct { - scheduled []base.Z + scheduled map[string][]base.Z + qname string want int }{ { - scheduled: []base.Z{z1, z2, z3}, - want: 3, + scheduled: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", + want: 3, }, { - scheduled: []base.Z{}, - want: 0, + scheduled: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - got, err := inspector.DeleteAllScheduledTasks() + got, err := inspector.DeleteAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("DeleteAllScheduledTasks() returned error: %v", err) + t.Errorf("DeleteAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("DeleteAllScheduledTasks() = %d, want %d", got, tc.want) + t.Errorf("DeleteAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotScheduled := asynqtest.GetScheduledEntries(t, r) + gotScheduled := asynqtest.GetScheduledEntries(t, r, tc.qname) if len(gotScheduled) != 0 { - t.Errorf("There are still %d entries in dead queue, want empty", - len(gotScheduled)) + t.Errorf("There are still %d scheduled tasks in queue %q, want empty", + tc.qname, len(gotScheduled)) } } } @@ -603,10 +678,12 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -614,35 +691,43 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { }) tests := []struct { - retry []base.Z + retry map[string][]base.Z + qname string want int }{ { - retry: []base.Z{z1, z2, z3}, + retry: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", want: 3, }, { - retry: []base.Z{}, + retry: map[string][]base.Z{ + "default": {}, + }, + qname: "default", want: 0, }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) - got, err := inspector.DeleteAllRetryTasks() + got, err := inspector.DeleteAllRetryTasks(tc.qname) if err != nil { - t.Errorf("DeleteAllRetryTasks() returned error: %v", err) + t.Errorf("DeleteAllRetryTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("DeleteAllRetryTasks() = %d, want %d", got, tc.want) + t.Errorf("DeleteAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotRetry := asynqtest.GetRetryEntries(t, r) + gotRetry := asynqtest.GetRetryEntries(t, r, tc.qname) if len(gotRetry) != 0 { - t.Errorf("There are still %d entries in dead queue, want empty", - len(gotRetry)) + t.Errorf("There are still %d retry tasks in queue %q, want empty", + tc.qname, len(gotRetry)) } } } @@ -652,10 +737,12 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -663,35 +750,43 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { }) tests := []struct { - dead []base.Z - want int + dead map[string][]base.Z + qname string + want int }{ { - dead: []base.Z{z1, z2, z3}, - want: 3, + dead: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", + want: 3, }, { - dead: []base.Z{}, - want: 0, + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: 0, }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) - got, err := inspector.DeleteAllDeadTasks() + got, err := inspector.DeleteAllDeadTasks(tc.qname) if err != nil { - t.Errorf("DeleteAllDeadTasks() returned error: %v", err) + t.Errorf("DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("DeleteAllDeadTasks() = %d, want %d", got, tc.want) + t.Errorf("DeleteAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotDead := asynqtest.GetDeadEntries(t, r) + gotDead := asynqtest.GetDeadEntries(t, r, tc.qname) if len(gotDead) != 0 { - t.Errorf("There are still %d entries in dead queue, want empty", - len(gotDead)) + t.Errorf("There are still %d dead tasks in queue %q, want empty", + tc.qname, len(gotDead)) } } } @@ -701,10 +796,12 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -712,19 +809,35 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { }) tests := []struct { - scheduled []base.Z - dead []base.Z - want int - wantDead []base.Z + scheduled map[string][]base.Z + dead map[string][]base.Z + qname string + want int + wantScheduled map[string][]base.Z + wantDead map[string][]base.Z }{ { - scheduled: []base.Z{z1, z2, z3}, - dead: []base.Z{}, - want: 3, - wantDead: []base.Z{ - base.Z{Message: m1, Score: now.Unix()}, - base.Z{Message: m2, Score: now.Unix()}, - base.Z{Message: m3, Score: now.Unix()}, + scheduled: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "default", + want: 3, + wantScheduled: map[string][]base.Z{ + "default": {}, + "custom": {z4}, + }, + wantDead: map[string][]base.Z{ + "default": { + base.Z{Message: m1, Score: now.Unix()}, + base.Z{Message: m2, Score: now.Unix()}, + base.Z{Message: m3, Score: now.Unix()}, + }, + "custom": {}, }, }, { @@ -753,29 +866,32 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) - got, err := inspector.KillAllScheduledTasks() + got, err := inspector.KillAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("KillAllScheduledTasks() returned error: %v", err) + t.Errorf("KillAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("KillAllScheduledTasks() = %d, want %d", got, tc.want) + t.Errorf("KillAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotScheduled := asynqtest.GetScheduledEntries(t, r) - if len(gotScheduled) != 0 { - t.Errorf("There are still %d entries in scheduled queue, want empty", - len(gotScheduled)) + for qname, want := range tc.wantScheduled { + gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } - // Allow Z.Score to differ by up to 2. - approxOpt := cmp.Comparer(func(a, b int64) bool { - return math.Abs(float64(a-b)) < 2 - }) - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { - t.Errorf("mismatch in %q; (-want,+got)\n%s", base.DeadQueue, diff) + for qname, want := range tc.wantDead { + // Allow Z.Score to differ by up to 2. + approxOpt := cmp.Comparer(func(a, b int64) bool { + return math.Abs(float64(a-b)) < 2 + }) + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } } } @@ -785,10 +901,12 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) m3 := asynqtest.NewTaskMessage("task3", nil) + m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -796,66 +914,98 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { }) tests := []struct { - retry []base.Z - dead []base.Z - want int - wantDead []base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string + want int + wantRetry map[string][]base.Z + wantDead map[string][]base.Z }{ { - retry: []base.Z{z1, z2, z3}, - dead: []base.Z{}, + retry: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "default", want: 3, - wantDead: []base.Z{ - base.Z{Message: m1, Score: now.Unix()}, - base.Z{Message: m2, Score: now.Unix()}, - base.Z{Message: m3, Score: now.Unix()}, + wantRetry: map[string][]base.Z{ + "default": {}, + "custom": {z4}, + }, + wantDead: map[string][]base.Z{ + "default": { + base.Z{Message: m1, Score: now.Unix()}, + base.Z{Message: m2, Score: now.Unix()}, + base.Z{Message: m3, Score: now.Unix()}, + }, + "custom": {}, }, }, { - retry: []base.Z{z1, z2}, - dead: []base.Z{z3}, + retry: map[string][]base.Z{ + "default": {z1, z2}, + }, + dead: map[string][]base.Z{ + "default": {z3}, + }, + qname: "default", want: 2, - wantDead: []base.Z{ - z3, - base.Z{Message: m1, Score: now.Unix()}, - base.Z{Message: m2, Score: now.Unix()}, + wantRetry: map[string][]base.Z{ + "default": {}, + }, + wantDead: map[string][]base.Z{ + "default": { + z3, + base.Z{Message: m1, Score: now.Unix()}, + base.Z{Message: m2, Score: now.Unix()}, + }, }, }, { - retry: []base.Z(nil), - dead: []base.Z(nil), - want: 0, - wantDead: []base.Z(nil), - }, - { - retry: []base.Z(nil), - dead: []base.Z{z1, z2}, - want: 0, - wantDead: []base.Z{z1, z2}, + retry: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {z1, z2}, + }, + want: 0, + wantRetry: map[string][]base.Z{ + "default": {}, + }, + wantdead: map[string][]base.Z{ + "default": {z1, z2}, + }, }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) - got, err := inspector.KillAllRetryTasks() + got, err := inspector.KillAllRetryTasks(tc.qname) if err != nil { - t.Errorf("KillAllRetryTasks() returned error: %v", err) + t.Errorf("KillAllRetryTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("KillAllRetryTasks() = %d, want %d", got, tc.want) + t.Errorf("KillAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotRetry := asynqtest.GetRetryEntries(t, r) - if len(gotRetry) != 0 { - t.Errorf("There are still %d entries in retry queue, want empty", - len(gotRetry)) + for qname, want := range tc.wantRetry { + gotRetry := asynqtest.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch in %q; (-want,+got)\n%s", base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } } } @@ -870,6 +1020,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -877,45 +1028,73 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { }) tests := []struct { - scheduled []base.Z - enqueued map[string][]*base.TaskMessage - want int - wantEnqueued map[string][]*base.TaskMessage + scheduled map[string][]base.Z + enqueued map[string][]*base.TaskMessage + qname string + want int + wantScheduled map[string][]base.Z + wantEnqueued map[string][]*base.TaskMessage }{ { - scheduled: []base.Z{z1, z2, z3}, + scheduled: map[string][]base.Z{ + "default": {z1, z4}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, }, - want: 3, + qname: "default", + want: 2, + wantScheduled: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + "low": {z3}, + }, wantEnqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2}, - "low": {m3}, + "default": {m1, m4}, + "critical": {}, + "low": {}, }, }, { - scheduled: []base.Z{z1, z2, z3}, + scheduled: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {m4}, "critical": {}, "low": {}, }, - want: 3, + qname: "default", + want: 1, + wantScheduled: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + "low": {z3}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m4, m1}, - "critical": {m2}, - "low": {m3}, + "critical": {}, + "low": {}, }, }, { - scheduled: []base.Z{}, + scheduled: map[string][]base.Z{ + "default": {}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, - want: 0, + qname: "default", + want: 0, + wantScheduled: map[string][]base.Z{ + "default": {}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, @@ -924,28 +1103,27 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) - for q, msgs := range tc.enqueued { - asynqtest.SeedEnqueuedQueue(t, r, msgs, q) - } + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - got, err := inspector.EnqueueAllScheduledTasks() + got, err := inspector.EnqueueAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllScheduledTasks() returned error: %v", err) + t.Errorf("EnqueueAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllScheduledTasks() = %d, want %d", got, tc.want) + t.Errorf("EnqueueAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotScheduled := asynqtest.GetScheduledEntries(t, r) - if len(gotScheduled) != 0 { - t.Errorf("There are still %d entries in scheduled queue, want empty", - len(gotScheduled)) + for qname, want := range tc.wantScheduled { + gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } } } @@ -961,6 +1139,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -968,33 +1147,55 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { }) tests := []struct { - retry []base.Z + retry map[string][]base.Z enqueued map[string][]*base.TaskMessage + qname string want int + wantRetry map[string][]base.Z wantEnqueued map[string][]*base.TaskMessage }{ { - retry: []base.Z{z1, z2, z3}, + retry: map[string][]base.Z{ + "default": {z1, z4}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, }, - want: 3, + qname: "default", + want: 2, + wantRetry: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + "low": {z3}, + }, wantEnqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2}, - "low": {m3}, + "default": {m1, m4}, + "critical": {}, + "low": {}, }, }, { - retry: []base.Z{z1, z2, z3}, + retry: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {m4}, "critical": {}, "low": {}, }, - want: 3, + qname: "default", + want: 1, + wantRetry: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + "low": {z3}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m4, m1}, "critical": {m2}, @@ -1002,11 +1203,17 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { }, }, { - retry: []base.Z{}, + retry: map[string][]base.Z{ + "default": {}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, - want: 0, + qname: "default", + want: 0, + wantRetry: map[string][]base.Z{ + "default": {}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, @@ -1015,32 +1222,32 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) - for q, msgs := range tc.enqueued { - asynqtest.SeedEnqueuedQueue(t, r, msgs, q) - } + asynqtest.SeedAllRetryQueues(t, r, tc.retry) + asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - got, err := inspector.EnqueueAllRetryTasks() + got, err := inspector.EnqueueAllRetryTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllRetryTasks() returned error: %v", err) + t.Errorf("EnqueueAllRetryTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllRetryTasks() = %d, want %d", got, tc.want) + t.Errorf("EnqueueAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotRetry := asynqtest.GetRetryEntries(t, r) - if len(gotRetry) != 0 { - t.Errorf("There are still %d entries in retry queue, want empty", - len(gotRetry)) + for qname, want := range tc.wantRetry { + gotRetry := asynqtest.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } } } } + func TestInspectorEnqueueAllDeadTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1051,6 +1258,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} + z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} inspector := NewInspector(RedisClientOpt{ Addr: redisAddr, @@ -1058,45 +1266,69 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { }) tests := []struct { - dead []base.Z + dead map[string][]base.Z enqueued map[string][]*base.TaskMessage + qname string want int + wantDead map[string][]base.Z wantEnqueued map[string][]*base.TaskMessage }{ { - dead: []base.Z{z1, z2, z3}, + dead: map[string][]base.Z{ + "default": {z1, z4}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, }, - want: 3, - wantEnqueued: map[string][]*base.TaskMessage{ - "default": {m1}, - "critical": {m2}, - "low": {m3}, + qname: "default", + want: 2, + wantDead: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + "low": {z3}, }, - }, - { - dead: []base.Z{z1, z2, z3}, - enqueued: map[string][]*base.TaskMessage{ - "default": {m4}, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {m1, m4}, "critical": {}, "low": {}, }, - want: 3, + }, + { + dead: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {m4}, + "critical": {}, + }, + qname: "default", + want: 1, + wantDead: map[string][]base.Z{ + "default": {}, + "critical": {z2}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m4, m1}, - "critical": {m2}, - "low": {m3}, + "critical": {}, }, }, { - dead: []base.Z{}, + dead: map[string][]base.Z{ + "default": {}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, - want: 0, + qname: "default", + want: 0, + wantDead: map[string][]base.Z{ + "default": {}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, @@ -1105,28 +1337,28 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDeadQueue(t, r, tc.dead) - for q, msgs := range tc.enqueued { - asynqtest.SeedEnqueuedQueue(t, r, msgs, q) - } + asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - got, err := inspector.EnqueueAllDeadTasks() + got, err := inspector.EnqueueAllDeadTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllDeadTasks() returned error: %v", err) + t.Errorf("EnqueueAllDeadTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllDeadTasks() = %d, want %d", got, tc.want) + t.Errorf("EnqueueAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - gotDead := asynqtest.GetDeadEntries(t, r) - if len(gotDead) != 0 { - t.Errorf("There are still %d entries in dead queue, want empty", - len(gotDead)) + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) + } + } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } } } @@ -1136,7 +1368,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1148,40 +1380,39 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { }) tests := []struct { - scheduled []base.Z - target *base.TaskMessage - wantScheduled []base.Z + scheduled map[string][]base.Z + qname string + key string + wantScheduled map[string][]base.Z }{ { - scheduled: []base.Z{z1, z2, z3}, - target: m2, - wantScheduled: []base.Z{z1, z3}, + scheduled: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + qname: "default", + key: createScheduledTask(z2).Key(), + wantScheduled: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, + }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - tasks, err := inspector.ListScheduledTasks() - if err != nil { - t.Errorf("ListScheduledTasks() returned error: %v", err) + if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.DeleteTaskByKey(task.Key()); err != nil { - t.Errorf("DeleteTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantScheduled { + gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } - } - gotScheduled := asynqtest.GetScheduledEntries(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.ScheduledQueue, diff) + } } } @@ -1190,7 +1421,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1202,41 +1433,39 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { }) tests := []struct { - retry []base.Z - target *base.TaskMessage - wantRetry []base.Z + retry map[string][]base.Z + qname string + key string + wantRetry map[string][]base.Z }{ { - retry: []base.Z{z1, z2, z3}, - target: m2, - wantRetry: []base.Z{z1, z3}, + retry: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + qname: "default", + key: createRetryTask(z2).Key(), + wantRetry: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, + }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) - tasks, err := inspector.ListRetryTasks() - if err != nil { - t.Errorf("ListRetryTasks() returned error: %v", err) + if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.DeleteTaskByKey(task.Key()); err != nil { - t.Errorf("DeleteTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantRetry { + gotRetry := asynqtest.GetRetryEntries(t, r, tc.qname) + if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } } - gotRetry := asynqtest.GetRetryEntries(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.RetryQueue, diff) - } } } @@ -1244,7 +1473,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} @@ -1256,49 +1485,47 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { }) tests := []struct { - dead []base.Z - target *base.TaskMessage - wantDead []base.Z + dead map[string][]base.Z + qname string + key string + wantDead map[string][]base.Z }{ { - dead: []base.Z{z1, z2, z3}, - target: m2, - wantDead: []base.Z{z1, z3}, + dead: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + qname: "default", + key: createDeadTask(z2).Key(), + wantDead: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, + }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedDAlleadQueues(t, r, tc.dead) - tasks, err := inspector.ListDeadTasks() - if err != nil { - t.Errorf("ListDeadTasks() returned error: %v", err) + if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.DeleteTaskByKey(task.Key()); err != nil { - t.Errorf("DeleteTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", tc.qname, diff) } } - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadQueue, diff) - } } } func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") + m2 := asynqtest.NewTaskMessage("task2", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1310,57 +1537,57 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { }) tests := []struct { - scheduled []base.Z + scheduled map[string][]base.Z enqueued map[string][]*base.TaskMessage - target *base.TaskMessage - wantScheduled []base.Z + qname string + key string + wantScheduled map[string][]base.Z wantEnqueued map[string][]*base.TaskMessage }{ { - scheduled: []base.Z{z1, z2, z3}, - enqueued: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {}, - "low": {}, + scheduled: map[string][]base.Z{ + "default": {z1, z2}, + "custom": {z3}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + qname: "default", + key: createScheduledTask(z2).Key(), + wantScheduled: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, }, - target: m2, - wantScheduled: []base.Z{z1, z3}, wantEnqueued: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {m2}, - "low": {}, + "default": {m2}, + "custom": {}, }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - tasks, err := inspector.ListScheduledTasks() - if err != nil { - t.Errorf("ListScheduledTasks() returned error: %v", err) + if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.EnqueueTaskByKey(task.Key()); err != nil { - t.Errorf("EnqueueTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantScheduled { + gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } - } - gotScheduled := asynqtest.GetScheduledEntries(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledQueue, diff) + } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } } @@ -1369,8 +1596,8 @@ loop: func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") + m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1382,57 +1609,56 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { }) tests := []struct { - retry []base.Z + retry map[string][]base.Z enqueued map[string][]*base.TaskMessage - target *base.TaskMessage - wantRetry []base.Z + qname string + key string + wantRetry map[string][]base.Z wantEnqueued map[string][]*base.TaskMessage }{ { - retry: []base.Z{z1, z2, z3}, - enqueued: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {}, - "low": {}, + retry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + enqueued: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {}, + }, + qname: "custom", + key: createRetryTask(z2).Key(), + wantRetry: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, }, - target: m2, - wantRetry: []base.Z{z1, z3}, wantEnqueued: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {m2}, - "low": {}, + "default": {}, + "custom": {m2}, }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - tasks, err := inspector.ListRetryTasks() - if err != nil { - t.Errorf("ListRetryTasks() returned error: %v", err) + if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("EnqueueTaskByKey(%q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.EnqueueTaskByKey(task.Key()); err != nil { - t.Errorf("EnqueueTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantRetry { + gotRetry := asynqtest.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } - gotRetry := asynqtest.GetRetryEntries(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryQueue, diff) - } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } } @@ -1454,21 +1680,31 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { }) tests := []struct { - dead []base.Z + dead map[string][]base.Z enqueued map[string][]*base.TaskMessage - target *base.TaskMessage - wantDead []base.Z + qname string + key string + wantDead map[string][]base.Z wantEnqueued map[string][]*base.TaskMessage }{ { - dead: []base.Z{z1, z2, z3}, + dead: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, enqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, }, - target: m2, - wantDead: []base.Z{z1, z3}, + qname: "critical", + key: createDeadTask(z2).Key(), + wantDead: map[string][]base.Z{ + "default": {z1}, + "critical": {}, + "low": {z3}, + }, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {m2}, @@ -1477,34 +1713,27 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) - tasks, err := inspector.ListDeadTasks() - if err != nil { - t.Errorf("ListDeadTasks() returned error: %v", err) + if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.EnqueueTaskByKey(task.Key()); err != nil { - t.Errorf("EnqueueTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadQueue, diff) - } for qname, want := range tc.wantEnqueued { gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } } @@ -1513,8 +1742,8 @@ loop: func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") + m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1526,60 +1755,67 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { }) tests := []struct { - scheduled []base.Z - dead []base.Z - target *base.TaskMessage - wantScheduled []base.Z - wantDead []base.Z + scheduled map[string][]base.Z + dead map[string][]base.Z + qname string + want string + wantScheduled map[string][]base.Z + wantDead map[string][]base.Z }{ { - scheduled: []base.Z{z1, z2, z3}, - dead: []base.Z{}, - target: m2, - wantScheduled: []base.Z{z1, z3}, - wantDead: []base.Z{ - {m2, now.Unix()}, + scheduled: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + key: createScheduledTask(z2).Key(), + scheduled: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, + }, + wantDead: map[string][]base.Z{ + "default": {}, + "custom": {{m2, now.Unix()}}, }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedScheduledQueue(t, r, tc.scheduled) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) - tasks, err := inspector.ListScheduledTasks() - if err != nil { - t.Errorf("ListScheduledTasks() returned error: %v", err) + if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("KillTaskByKey(%q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.KillTaskByKey(task.Key()); err != nil { - t.Errorf("KillTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantScheduled { + gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) + } + + } + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } - gotScheduled := asynqtest.GetScheduledEntries(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledQueue, diff) - } - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadQueue, diff) - } - } } func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") + m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} @@ -1591,51 +1827,57 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { }) tests := []struct { - retry []base.Z - dead []base.Z - target *base.TaskMessage - wantRetry []base.Z - wantDead []base.Z + retry map[string][]base.Z + dead map[string][]base.Z + qname string + key string + wantRetry map[string][]base.Z + wantDead map[string][]base.Z }{ { - retry: []base.Z{z1, z2, z3}, - dead: []base.Z{}, - target: m2, - wantRetry: []base.Z{z1, z3}, + retry: map[string][]base.Z{ + "default": {z1}, + "custom": {z2, z3}, + }, + dead: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + key: createRetryTask(z2).Key(), + wantRetry: map[string][]base.Z{ + "default": {z1}, + "custom": {z3}, + }, wantDead: []base.Z{ - {m2, now.Unix()}, + "default": {}, + "custom": {{m2, now.Unix()}}, }, }, } -loop: for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedRetryQueue(t, r, tc.retry) - asynqtest.SeedDeadQueue(t, r, tc.dead) + asynqtest.SeedAllRetryQueues(t, r, tc.retry) + asynqtest.SeedAllDeadQueues(t, r, tc.dead) - tasks, err := inspector.ListRetryTasks() - if err != nil { - t.Errorf("ListRetryTasks() returned error: %v", err) + if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("KillTaskByKey(%q) returned error: %v", tc.qname, tc.key, err) continue } - for _, task := range tasks { - if task.ID == tc.target.ID.String() { - if err := inspector.KillTaskByKey(task.Key()); err != nil { - t.Errorf("KillTaskByKey(%q) returned error: %v", - task.Key(), err) - continue loop - } + for qname, want := range tc.wantRetry { + gotRetry := asynqtest.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) } } - gotRetry := asynqtest.GetRetryEntries(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryQueue, diff) + for qname, want := range tc.wantDead { + gotDead := asynqtest.GetDeadEntries(t, r, qname) + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + tc.qname, diff) + } } - gotDead := asynqtest.GetDeadEntries(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadQueue, diff) - } - } }