diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index f73476b..df7c187 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -254,9 +254,9 @@ func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, e return r.listMessages(qkey, pgn) } -// ListInProgress returns all tasks that are currently being processed. -func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) { - return r.listMessages(base.InProgressQueue, pgn) +// ListInProgress returns all tasks that are currently being processed for the given queue. +func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) { + return r.listMessages(base.InProgressKey(qname), pgn) } // listMessages returns a list of TaskMessage in Redis list with the given key. @@ -282,21 +282,21 @@ func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, err } -// ListScheduled returns all tasks that are scheduled to be processed -// in the future. -func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) { - return r.listZSetEntries(base.ScheduledQueue, pgn) +// ListScheduled returns all tasks from the given queue that are scheduled +// to be processed in the future. +func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { + return r.listZSetEntries(base.ScheduledKey(qname), pgn) } -// ListRetry returns all tasks that have failed before and willl be retried -// in the future. -func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) { - return r.listZSetEntries(base.RetryQueue, pgn) +// ListRetry returns all tasks from the given queue that have failed before +// and willl be retried in the future. +func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { + return r.listZSetEntries(base.RetryKey(qname), pgn) } -// ListDead returns all tasks that have exhausted its retry limit. -func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) { - return r.listZSetEntries(base.DeadQueue, pgn) +// ListDead returns all tasks from the given queue that have exhausted its retry limit. +func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) { + return r.listZSetEntries(base.DeadKey(qname), pgn) } // listZSetEntries returns a list of message and score pairs in Redis sorted-set diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 02f2a0d..b885c7c 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -369,7 +369,7 @@ func TestListEnqueuedPagination(t *testing.T) { msgs = append(msgs, msg) } // create 100 tasks in default queue - h.SeedEnqueuedQueue(t, r.client, msgs) + h.SeedEnqueuedQueue(t, r.client, msgs, "default") msgs = []*base.TaskMessage(nil) // empty list for i := 0; i < 100; i++ { @@ -432,26 +432,44 @@ func TestListInProgress(t *testing.T) { m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m4 := h.NewTaskMessageWithQueue("task2", nil, "low") tests := []struct { - inProgress []*base.TaskMessage + inProgress map[string][]*base.TaskMessage + qname string + want []*base.TaskMessage }{ - {inProgress: []*base.TaskMessage{m1, m2}}, - {inProgress: []*base.TaskMessage(nil)}, + { + inProgress: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "critical": {m3}, + "low": {m4}, + }, + qname: "default", + want: []*base.TaskMessage{m1, m2}, + }, + { + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + }, + qname: "default", + want: []*base.TaskMessage{}, + }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedInProgressQueue(t, r.client, tc.inProgress) + h.SeedAllInProgressQueues(t, r.client, tc.inProgress) - got, err := r.ListInProgress(Pagination{Size: 20, Page: 0}) - op := "r.ListInProgress(Pagination{Size: 20, Page: 0})" + got, err := r.ListInProgress(tc.qname, Pagination{Size: 20, Page: 0}) + op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress) continue } - if diff := cmp.Diff(tc.inProgress, got); diff != "" { - t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.inProgress, diff) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } } @@ -464,26 +482,27 @@ func TestListInProgressPagination(t *testing.T) { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msgs = append(msgs, msg) } - h.SeedInProgressQueue(t, r.client, msgs) + h.SeedInProgressQueue(t, r.client, msgs, "default") tests := []struct { desc string + qname string page int size int wantSize int wantFirst string wantLast string }{ - {"first page", 0, 20, 20, "task 0", "task 19"}, - {"second page", 1, 20, 20, "task 20", "task 39"}, - {"different page size", 2, 30, 30, "task 60", "task 89"}, - {"last page", 3, 30, 10, "task 90", "task 99"}, - {"out of range", 4, 30, 0, "", ""}, + {"first page", "default", 0, 20, 20, "task 0", "task 19"}, + {"second page", "default", 1, 20, 20, "task 20", "task 39"}, + {"different page size", "default", 2, 30, 30, "task 60", "task 89"}, + {"last page", "default", 3, 30, 10, "task 90", "task 99"}, + {"out of range", "default", 4, 30, 0, "", ""}, } for _, tc := range tests { - got, err := r.ListInProgress(Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListInProgress(Pagination{Size: %d, Page: %d})", tc.size, tc.page) + got, err := r.ListInProgress(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err) continue @@ -517,20 +536,29 @@ func TestListScheduled(t *testing.T) { m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task3", nil, "custom") p1 := time.Now().Add(30 * time.Minute) p2 := time.Now().Add(24 * time.Hour) p3 := time.Now().Add(5 * time.Minute) + p4 := time.Now().Add(2 * time.Minute) tests := []struct { - scheduled []base.Z + scheduled map[string][]base.Z + qname string want []base.Z }{ { - scheduled: []base.Z{ - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, - {Message: m3, Score: p3.Unix()}, + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: p1.Unix()}, + {Message: m2, Score: p2.Unix()}, + {Message: m3, Score: p3.Unix()}, + }, + "custom": { + {Message: m4, Score: p4.Unix()}, + }, }, + qname: "default", // should be sorted by score in ascending order want: []base.Z{ {Message: m3, Score: p3.Unix()}, @@ -539,17 +567,36 @@ func TestListScheduled(t *testing.T) { }, }, { - scheduled: []base.Z(nil), - want: []base.Z(nil), + scheduled: map[string][]base.Z{ + "default": { + {Message: m1, Score: p1.Unix()}, + {Message: m2, Score: p2.Unix()}, + {Message: m3, Score: p3.Unix()}, + }, + "custom": { + {Message: m4, Score: p4.Unix()}, + }, + }, + qname: "custom", + want: []base.Z{ + {Message: m4, Score: p4.Unix()}, + }, + }, + { + scheduled: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: []base.Z(nil), }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedScheduledQueue(t, r.client, tc.scheduled) + h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got, err := r.ListScheduled(Pagination{Size: 20, Page: 0}) - op := "r.ListScheduled(Pagination{Size: 20, Page: 0})" + got, err := r.ListScheduled(tc.qname, Pagination{Size: 20, Page: 0}) + op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue @@ -573,22 +620,23 @@ func TestListScheduledPagination(t *testing.T) { tests := []struct { desc string + qname string page int size int wantSize int wantFirst string wantLast string }{ - {"first page", 0, 20, 20, "task 0", "task 19"}, - {"second page", 1, 20, 20, "task 20", "task 39"}, - {"different page size", 2, 30, 30, "task 60", "task 89"}, - {"last page", 3, 30, 10, "task 90", "task 99"}, - {"out of range", 4, 30, 0, "", ""}, + {"first page", "default", 0, 20, 20, "task 0", "task 19"}, + {"second page", "default", 1, 20, 20, "task 20", "task 39"}, + {"different page size", "default", 2, 30, 30, "task 60", "task 89"}, + {"last page", "default", 3, 30, 10, "task 90", "task 99"}, + {"out of range", "default", 4, 30, 0, "", ""}, } for _, tc := range tests { - got, err := r.ListScheduled(Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListScheduled(Pagination{Size: %d, Page: %d})", tc.size, tc.page) + got, err := r.ListScheduled(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err) continue @@ -621,51 +669,86 @@ func TestListRetry(t *testing.T) { r := setup(t) m1 := &base.TaskMessage{ ID: uuid.New(), - Type: "send_email", + Type: "task1", Queue: "default", - Payload: map[string]interface{}{"subject": "hello"}, - ErrorMsg: "email server not responding", + Payload: nil, + ErrorMsg: "some error occurred", Retry: 25, Retried: 10, } m2 := &base.TaskMessage{ ID: uuid.New(), - Type: "reindex", + Type: "task2", Queue: "default", Payload: nil, - ErrorMsg: "search engine not responding", + ErrorMsg: "some error occurred", Retry: 25, Retried: 2, } + m3 := &base.TaskMessage{ + ID: uuid.New(), + Type: "task3", + Queue: "custom", + Payload: nil, + ErrorMsg: "some error occurred", + Retry: 25, + Retried: 3, + } p1 := time.Now().Add(5 * time.Minute) p2 := time.Now().Add(24 * time.Hour) + p3 := time.Now().Add(24 * time.Hour) tests := []struct { - retry []base.Z + retry map[string][]base.Z + qname string want []base.Z }{ { - retry: []base.Z{ - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: p1.Unix()}, + {Message: m2, Score: p2.Unix()}, + }, + "custom": { + {Message: m3, Score: p3.Unix()}, + }, }, + qname: "default", want: []base.Z{ {Message: m1, Score: p1.Unix()}, {Message: m2, Score: p2.Unix()}, }, }, { - retry: []base.Z(nil), + retry: map[string][]base.Z{ + "default": { + {Message: m1, Score: p1.Unix()}, + {Message: m2, Score: p2.Unix()}, + }, + "custom": { + {Message: m3, Score: p3.Unix()}, + }, + }, + qname: "custom", + want: []base.Z{ + {Message: m3, Score: p3.Unix()}, + }, + }, + { + retry: map[string][]base.Z{ + "default": {}, + }, + qname: "default", want: []base.Z(nil), }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedRetryQueue(t, r.client, tc.retry) + h.SeedAllRetryQueues(t, r.client, tc.retry) got, err := r.ListRetry(Pagination{Size: 20, Page: 0}) - op := "r.ListRetry(Pagination{Size: 20, Page: 0})" + op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue @@ -688,27 +771,28 @@ func TestListRetryPagination(t *testing.T) { processAt := now.Add(time.Duration(i) * time.Second) seed = append(seed, base.Z{Message: msg, Score: processAt.Unix()}) } - h.SeedRetryQueue(t, r.client, seed) + h.SeedRetryQueue(t, r.client, seed, "default") tests := []struct { desc string + qname string page int size int wantSize int wantFirst string wantLast string }{ - {"first page", 0, 20, 20, "task 0", "task 19"}, - {"second page", 1, 20, 20, "task 20", "task 39"}, - {"different page size", 2, 30, 30, "task 60", "task 89"}, - {"last page", 3, 30, 10, "task 90", "task 99"}, - {"out of range", 4, 30, 0, "", ""}, + {"first page", "default", 0, 20, 20, "task 0", "task 19"}, + {"second page", "default", 1, 20, 20, "task 20", "task 39"}, + {"different page size", "default", 2, 30, 30, "task 60", "task 89"}, + {"last page", "default", 3, 30, 10, "task 90", "task 99"}, + {"out of range", "default", 4, 30, 0, "", ""}, } for _, tc := range tests { - got, err := r.ListRetry(Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListRetry(Pagination{Size: %d, Page: %d})", - tc.size, tc.page) + got, err := r.ListRetry(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: %d, Page: %d})", + tc.qname, tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err) continue @@ -742,47 +826,80 @@ func TestListDead(t *testing.T) { r := setup(t) m1 := &base.TaskMessage{ ID: uuid.New(), - Type: "send_email", + Type: "task1", Queue: "default", - Payload: map[string]interface{}{"subject": "hello"}, - ErrorMsg: "email server not responding", + Payload: nil, + ErrorMsg: "some error occurred", } m2 := &base.TaskMessage{ ID: uuid.New(), - Type: "reindex", + Type: "task2", Queue: "default", Payload: nil, - ErrorMsg: "search engine not responding", + ErrorMsg: "some error occurred", + } + m3 := &base.TaskMessage{ + ID: uuid.New(), + Type: "task3", + Queue: "custom", + Payload: nil, + ErrorMsg: "some error occurred", } f1 := time.Now().Add(-5 * time.Minute) f2 := time.Now().Add(-24 * time.Hour) + f3 := time.Now().Add(-4 * time.Hour) tests := []struct { - dead []base.Z - want []base.Z + dead map[string][]base.Z + qname string + want []base.Z }{ { - dead: []base.Z{ - {Message: m1, Score: f1.Unix()}, - {Message: m2, Score: f2.Unix()}, + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: f1.Unix()}, + {Message: m2, Score: f2.Unix()}, + }, + "custom": { + {Message: m3, Score: f3.Unix()}, + }, }, + qname: "default", want: []base.Z{ {Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order? {Message: m1, Score: f1.Unix()}, }, }, { - dead: []base.Z(nil), - want: []base.Z(nil), + dead: map[string][]base.Z{ + "default": { + {Message: m1, Score: f1.Unix()}, + {Message: m2, Score: f2.Unix()}, + }, + "custom": { + {Message: m3, Score: f3.Unix()}, + }, + }, + qname: "custom", + want: []base.Z{ + {Message: m3, Score: f3.Unix()}, + }, + }, + { + dead: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: []base.Z(nil), }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDeadQueue(t, r.client, tc.dead) + h.SeedAllDeadQueues(t, r.client, tc.dead) got, err := r.ListDead(Pagination{Size: 20, Page: 0}) - op := "r.ListDead(Pagination{Size: 20, Page: 0})" + op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue @@ -802,25 +919,26 @@ func TestListDeadPagination(t *testing.T) { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) entries = append(entries, base.Z{Message: msg, Score: int64(i)}) } - h.SeedDeadQueue(t, r.client, entries) + h.SeedDeadQueue(t, r.client, entries, "default") tests := []struct { desc string + qname string page int size int wantSize int wantFirst string wantLast string }{ - {"first page", 0, 20, 20, "task 0", "task 19"}, - {"second page", 1, 20, 20, "task 20", "task 39"}, - {"different page size", 2, 30, 30, "task 60", "task 89"}, - {"last page", 3, 30, 10, "task 90", "task 99"}, - {"out of range", 4, 30, 0, "", ""}, + {"first page", "default", 0, 20, 20, "task 0", "task 19"}, + {"second page", "default", 1, 20, 20, "task 20", "task 39"}, + {"different page size", "default", 2, 30, 30, "task 60", "task 89"}, + {"last page", "default", 3, 30, 10, "task 90", "task 99"}, + {"out of range", "default", 4, 30, 0, "", ""}, } for _, tc := range tests { - got, err := r.ListDead(Pagination{Size: tc.size, Page: tc.page}) + got, err := r.ListDead(tc.qname, Pagination{Size: tc.size, Page: tc.page}) op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", tc.size, tc.page) if err != nil {