diff --git a/README.md b/README.md index 6bdf822..a17306d 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ func NewImageProcessor() *ImageProcessor { ``` In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue. +// TODO: This description needs to be updated. A task will be processed asynchronously by a background worker as soon as the task gets enqueued. Scheduled tasks will be stored in Redis and will be enqueued at the specified time. diff --git a/client_test.go b/client_test.go index 57556e5..1ad9466 100644 --- a/client_test.go +++ b/client_test.go @@ -32,7 +32,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { processAt time.Time // value for ProcessAt option opts []Option // other options wantRes *Result - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ { @@ -47,7 +47,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -75,7 +75,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantScheduled: map[string][]base.Z{ @@ -114,9 +114,9 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { tc.desc, gotRes, tc.wantRes, diff) } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) } } @@ -137,11 +137,11 @@ func TestClientEnqueue(t *testing.T) { now := time.Now() tests := []struct { - desc string - task *Task - opts []Option - wantRes *Result - wantEnqueued map[string][]*base.TaskMessage + desc string + task *Task + opts []Option + wantRes *Result + wantPending map[string][]*base.TaskMessage }{ { desc: "Process task immediately with a custom retry count", @@ -156,7 +156,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -182,7 +182,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -209,7 +209,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -235,7 +235,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "custom": { { Type: task.Type, @@ -261,7 +261,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "high": { { Type: task.Type, @@ -287,7 +287,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: 20 * time.Second, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -313,7 +313,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: noTimeout, Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -340,7 +340,7 @@ func TestClientEnqueue(t *testing.T) { Timeout: 20 * time.Second, Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -372,8 +372,8 @@ func TestClientEnqueue(t *testing.T) { tc.desc, gotRes, tc.wantRes, diff) } - for qname, want := range tc.wantEnqueued { - got := h.GetEnqueuedMessages(t, r, qname) + for qname, want := range tc.wantPending { + got := h.GetPendingMessages(t, r, qname) if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) } @@ -394,11 +394,11 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { delay time.Duration // value for ProcessIn option opts []Option // other options wantRes *Result - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ { - desc: "schedule a task to be enqueued in one hour", + desc: "schedule a task to be processed in one hour", task: task, delay: 1 * time.Hour, opts: []Option{}, @@ -409,7 +409,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantScheduled: map[string][]base.Z{ @@ -440,7 +440,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { Timeout: defaultTimeout, Deadline: noDeadline, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": { { Type: task.Type, @@ -476,9 +476,9 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { tc.desc, gotRes, tc.wantRes, diff) } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) } } @@ -619,15 +619,15 @@ func TestClientDefaultOptions(t *testing.T) { t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s", tc.desc, gotRes, tc.wantRes, diff) } - enqueued := h.GetEnqueuedMessages(t, r, tc.queue) - if len(enqueued) != 1 { + pending := h.GetPendingMessages(t, r, tc.queue) + if len(pending) != 1 { t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.", - tc.desc, tc.queue, len(enqueued)) + tc.desc, tc.queue, len(pending)) continue } - got := enqueued[0] + got := pending[0] if diff := cmp.Diff(tc.want, got, h.IgnoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in enqueued task message; (-want,+got)\n%s", + t.Errorf("%s;\nmismatch found in pending task message; (-want,+got)\n%s", tc.desc, diff) } } diff --git a/inspector.go b/inspector.go index fa4abf7..c097c61 100644 --- a/inspector.go +++ b/inspector.go @@ -37,10 +37,10 @@ type QueueStats struct { // Name of the queue. Queue string // Size is the total number of tasks in the queue. - // The value is the sum of Enqueued, InProgress, Scheduled, Retry, and Dead. + // The value is the sum of Pending, InProgress, Scheduled, Retry, and Dead. Size int - // Number of enqueued tasks. - Enqueued int + // Number of pending tasks. + Pending int // Number of in-progress tasks. InProgress int // Number of scheduled tasks. @@ -73,7 +73,7 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { return &QueueStats{ Queue: stats.Queue, Size: stats.Size, - Enqueued: stats.Enqueued, + Pending: stats.Pending, InProgress: stats.InProgress, Scheduled: stats.Scheduled, Retry: stats.Retry, @@ -119,8 +119,8 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { return res, nil } -// EnqueuedTask is a task in a queue and is ready to be processed. -type EnqueuedTask struct { +// PendingTask is a task in a queue and is ready to be processed. +type PendingTask struct { *Task ID string Queue string @@ -269,22 +269,22 @@ func Page(n int) ListOption { return pageNumOpt(n) } -// ListEnqueuedTasks retrieves enqueued tasks from the specified queue. +// ListPendingTasks retrieves pending tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) { +func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { if err := validateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListEnqueued(qname, pgn) + msgs, err := i.rdb.ListPending(qname, pgn) if err != nil { return nil, err } - var tasks []*EnqueuedTask + var tasks []*PendingTask for _, m := range msgs { - tasks = append(tasks, &EnqueuedTask{ + tasks = append(tasks, &PendingTask{ Task: NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, diff --git a/inspector_test.go b/inspector_test.go index 1a240a3..2bfb1f2 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -63,7 +63,7 @@ func TestInspectorCurrentStats(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z @@ -74,7 +74,7 @@ func TestInspectorCurrentStats(t *testing.T) { want *QueueStats }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1}, "critical": {m5}, "low": {m6}, @@ -116,7 +116,7 @@ func TestInspectorCurrentStats(t *testing.T) { want: &QueueStats{ Queue: "default", Size: 4, - Enqueued: 1, + Pending: 1, InProgress: 1, Scheduled: 2, Retry: 0, @@ -131,7 +131,7 @@ func TestInspectorCurrentStats(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllRetryQueues(t, r, tc.retry) @@ -215,15 +215,15 @@ func TestInspectorHistory(t *testing.T) { } } -func createEnqueuedTask(msg *base.TaskMessage) *EnqueuedTask { - return &EnqueuedTask{ +func createPendingTask(msg *base.TaskMessage) *PendingTask { + return &PendingTask{ Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, } } -func TestInspectorListEnqueuedTasks(t *testing.T) { +func TestInspectorListPendingTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) @@ -233,59 +233,59 @@ func TestInspectorListEnqueuedTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - desc string - enqueued map[string][]*base.TaskMessage - qname string - want []*EnqueuedTask + desc string + pending map[string][]*base.TaskMessage + qname string + want []*PendingTask }{ { desc: "with default queue", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, }, qname: "default", - want: []*EnqueuedTask{ - createEnqueuedTask(m1), - createEnqueuedTask(m2), + want: []*PendingTask{ + createPendingTask(m1), + createPendingTask(m2), }, }, { desc: "with named queue", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "critical": {m3}, "low": {m4}, }, qname: "critical", - want: []*EnqueuedTask{ - createEnqueuedTask(m3), + want: []*PendingTask{ + createPendingTask(m3), }, }, { desc: "with empty queue", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, }, qname: "default", - want: []*EnqueuedTask(nil), + want: []*PendingTask(nil), }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - for q, msgs := range tc.enqueued { - asynqtest.SeedEnqueuedQueue(t, r, msgs, q) + for q, msgs := range tc.pending { + asynqtest.SeedPendingQueue(t, r, msgs, q) } - got, err := inspector.ListEnqueuedTasks(tc.qname) + got, err := inspector.ListPendingTasks(tc.qname) if err != nil { - t.Errorf("%s; ListEnqueuedTasks(%q) returned error: %v", + t.Errorf("%s; ListPendingTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListEnqueuedTasks(%q) = %v, want %v; (-want,+got)\n%s", + t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } @@ -576,53 +576,53 @@ func TestInspectorListPagination(t *testing.T) { asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) } r := setup(t) - asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName) + asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { page int pageSize int - want []*EnqueuedTask + want []*PendingTask }{ { page: 1, pageSize: 5, - want: []*EnqueuedTask{ - createEnqueuedTask(msgs[0]), - createEnqueuedTask(msgs[1]), - createEnqueuedTask(msgs[2]), - createEnqueuedTask(msgs[3]), - createEnqueuedTask(msgs[4]), + want: []*PendingTask{ + createPendingTask(msgs[0]), + createPendingTask(msgs[1]), + createPendingTask(msgs[2]), + createPendingTask(msgs[3]), + createPendingTask(msgs[4]), }, }, { page: 3, pageSize: 10, - want: []*EnqueuedTask{ - createEnqueuedTask(msgs[20]), - createEnqueuedTask(msgs[21]), - createEnqueuedTask(msgs[22]), - createEnqueuedTask(msgs[23]), - createEnqueuedTask(msgs[24]), - createEnqueuedTask(msgs[25]), - createEnqueuedTask(msgs[26]), - createEnqueuedTask(msgs[27]), - createEnqueuedTask(msgs[28]), - createEnqueuedTask(msgs[29]), + want: []*PendingTask{ + createPendingTask(msgs[20]), + createPendingTask(msgs[21]), + createPendingTask(msgs[22]), + createPendingTask(msgs[23]), + createPendingTask(msgs[24]), + createPendingTask(msgs[25]), + createPendingTask(msgs[26]), + createPendingTask(msgs[27]), + createPendingTask(msgs[28]), + createPendingTask(msgs[29]), }, }, } for _, tc := range tests { - got, err := inspector.ListEnqueuedTasks("default", Page(tc.page), PageSize(tc.pageSize)) + got, err := inspector.ListPendingTasks("default", Page(tc.page), PageSize(tc.pageSize)) if err != nil { - t.Errorf("ListEnqueuedTask('default') returned error: %v", err) + t.Errorf("ListPendingTask('default') returned error: %v", err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("ListEnqueuedTask('default') = %v, want %v; (-want,+got)\n%s", + t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) } } @@ -1084,11 +1084,11 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage qname string want int wantScheduled map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { scheduled: map[string][]base.Z{ @@ -1096,7 +1096,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -1108,7 +1108,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, "critical": {}, "low": {}, @@ -1120,7 +1120,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m4}, "critical": {}, "low": {}, @@ -1132,7 +1132,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m4, m1}, "critical": {}, "low": {}, @@ -1142,7 +1142,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, qname: "default", @@ -1150,7 +1150,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, }, @@ -1159,7 +1159,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.EnqueueAllScheduledTasks(tc.qname) if err != nil { @@ -1175,10 +1175,10 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1199,12 +1199,12 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - retry map[string][]base.Z - enqueued map[string][]*base.TaskMessage - qname string - want int - wantRetry map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + retry map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + want int + wantRetry map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { retry: map[string][]base.Z{ @@ -1212,7 +1212,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -1224,7 +1224,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, "critical": {}, "low": {}, @@ -1236,7 +1236,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m4}, "critical": {}, "low": {}, @@ -1248,7 +1248,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m4, m1}, "critical": {}, "low": {}, @@ -1258,7 +1258,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, qname: "default", @@ -1266,7 +1266,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, }, @@ -1275,7 +1275,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.EnqueueAllRetryTasks(tc.qname) if err != nil { @@ -1291,10 +1291,10 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1315,12 +1315,12 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - enqueued map[string][]*base.TaskMessage - qname string - want int - wantDead map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + dead map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + want int + wantDead map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { dead: map[string][]base.Z{ @@ -1328,7 +1328,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -1340,7 +1340,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { "critical": {z2}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, "critical": {}, "low": {}, @@ -1351,7 +1351,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { "default": {z1}, "critical": {z2}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m4}, "critical": {}, }, @@ -1361,7 +1361,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { "default": {}, "critical": {z2}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m4, m1}, "critical": {}, }, @@ -1370,7 +1370,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { dead: map[string][]base.Z{ "default": {}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, qname: "default", @@ -1378,7 +1378,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { wantDead: map[string][]base.Z{ "default": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m1, m4}, }, }, @@ -1387,7 +1387,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllDeadQueues(t, r, tc.dead) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.EnqueueAllDeadTasks(tc.qname) if err != nil { @@ -1404,10 +1404,10 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1574,18 +1574,18 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage qname string key string wantScheduled map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { scheduled: map[string][]base.Z{ "default": {z1, z2}, "custom": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "custom": {}, }, @@ -1595,7 +1595,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { "default": {z1}, "custom": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {m2}, "custom": {}, }, @@ -1605,7 +1605,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) @@ -1619,10 +1619,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1642,19 +1642,19 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - retry map[string][]base.Z - enqueued map[string][]*base.TaskMessage - qname string - key string - wantRetry map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + retry map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + key string + wantRetry map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { retry: map[string][]base.Z{ "default": {z1}, "custom": {z2, z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "custom": {}, }, @@ -1664,7 +1664,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { "default": {z1}, "custom": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2}, }, @@ -1674,7 +1674,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) @@ -1687,10 +1687,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { qname, diff) } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1710,12 +1710,12 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - enqueued map[string][]*base.TaskMessage - qname string - key string - wantDead map[string][]base.Z - wantEnqueued map[string][]*base.TaskMessage + dead map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + key string + wantDead map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { dead: map[string][]base.Z{ @@ -1723,7 +1723,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { "critical": {z2}, "low": {z3}, }, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -1735,7 +1735,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { "critical": {}, "low": {z3}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {m2}, "low": {}, @@ -1746,7 +1746,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllDeadQueues(t, r, tc.dead) - asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) @@ -1759,10 +1759,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { qname, diff) } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" { - t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 6e831ac..f33210c 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -174,8 +174,8 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) { } } -// SeedEnqueuedQueue initializes the specified queue with the given messages. -func SeedEnqueuedQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { +// SeedPendingQueue initializes the specified queue with the given messages. +func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) seedRedisList(tb, r, base.QueueKey(qname), msgs) @@ -216,12 +216,12 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries) } -// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages. +// SeedAllPendingQueues initializes all of the specified queues with the given messages. // -// enqueued maps a queue name to a list of messages. -func SeedAllEnqueuedQueues(tb testing.TB, r redis.UniversalClient, enqueued map[string][]*base.TaskMessage) { - for q, msgs := range enqueued { - SeedEnqueuedQueue(tb, r, msgs, q) +// pending maps a queue name to a list of messages. +func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[string][]*base.TaskMessage) { + for q, msgs := range pending { + SeedPendingQueue(tb, r, msgs, q) } } @@ -278,8 +278,8 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b } } -// GetEnqueuedMessages returns all enqueued messages in the given queue. -func GetEnqueuedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { +// GetPendingMessages returns all pending messages in the given queue. +func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() return getListMessages(tb, r, base.QueueKey(qname)) } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 7634679..c3e7f86 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -31,7 +31,7 @@ type Stats struct { // Size is the total number of tasks in the queue. Size int // Number of tasks in each state. - Enqueued int + Pending int InProgress int Scheduled int Retry int @@ -133,7 +133,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { val := cast.ToInt(data[i+1]) switch key { case base.QueueKey(qname): - stats.Enqueued = val + stats.Pending = val size += val case base.InProgressKey(qname): stats.InProgress = val @@ -258,8 +258,8 @@ func (p Pagination) stop() int64 { return int64(p.Size*p.Page + p.Size - 1) } -// ListEnqueued returns enqueued tasks that are ready to be processed. -func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) { +// ListPending returns pending tasks that are ready to be processed. +func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } @@ -680,12 +680,12 @@ return redis.status_reply("OK")`) // KEYS[5] -> asynq:{}:dead // KEYS[6] -> asynq:{}:deadlines var removeQueueCmd = redis.NewScript(` -local enqueued = redis.call("LLEN", KEYS[1]) +local pending = redis.call("LLEN", KEYS[1]) local inprogress = redis.call("LLEN", KEYS[2]) local scheduled = redis.call("SCARD", KEYS[3]) local retry = redis.call("SCARD", KEYS[4]) local dead = redis.call("SCARD", KEYS[5]) -local total = enqueued + inprogress + scheduled + retry + dead +local total = pending + inprogress + scheduled + retry + dead if total > 0 then return redis.error_reply("QUEUE NOT EMPTY") end diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 3855c62..7620e28 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -57,7 +57,7 @@ func TestCurrentStats(t *testing.T) { now := time.Now() tests := []struct { - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z @@ -69,7 +69,7 @@ func TestCurrentStats(t *testing.T) { want *Stats }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1}, "critical": {m5}, "low": {m6}, @@ -113,7 +113,7 @@ func TestCurrentStats(t *testing.T) { Queue: "default", Paused: false, Size: 4, - Enqueued: 1, + Pending: 1, InProgress: 1, Scheduled: 2, Retry: 0, @@ -124,7 +124,7 @@ func TestCurrentStats(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1}, "critical": {m5}, "low": {m6}, @@ -168,7 +168,7 @@ func TestCurrentStats(t *testing.T) { Queue: "critical", Paused: true, Size: 1, - Enqueued: 1, + Pending: 1, InProgress: 0, Scheduled: 0, Retry: 0, @@ -187,7 +187,7 @@ func TestCurrentStats(t *testing.T) { t.Fatal(err) } } - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllInProgressQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) @@ -303,7 +303,7 @@ func TestRedisInfo(t *testing.T) { } } -func TestListEnqueued(t *testing.T) { +func TestListPending(t *testing.T) { r := setup(t) m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) @@ -312,26 +312,26 @@ func TestListEnqueued(t *testing.T) { m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") tests := []struct { - enqueued map[string][]*base.TaskMessage - qname string - want []*base.TaskMessage + pending map[string][]*base.TaskMessage + qname string + want []*base.TaskMessage }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m1, m2}, }, qname: base.DefaultQueueName, want: []*base.TaskMessage{m1, m2}, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: nil, }, qname: base.DefaultQueueName, want: []*base.TaskMessage(nil), }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m1, m2}, "critical": {m3}, "low": {m4}, @@ -340,7 +340,7 @@ func TestListEnqueued(t *testing.T) { want: []*base.TaskMessage{m1, m2}, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m1, m2}, "critical": {m3}, "low": {m4}, @@ -352,10 +352,10 @@ func TestListEnqueued(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) - got, err := r.ListEnqueued(tc.qname, Pagination{Size: 20, Page: 0}) - op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: 20, Page: 0})", tc.qname) + got, err := r.ListPending(tc.qname, Pagination{Size: 20, Page: 0}) + op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue @@ -367,7 +367,7 @@ func TestListEnqueued(t *testing.T) { } } -func TestListEnqueuedPagination(t *testing.T) { +func TestListPendingPagination(t *testing.T) { r := setup(t) var msgs []*base.TaskMessage for i := 0; i < 100; i++ { @@ -375,7 +375,7 @@ func TestListEnqueuedPagination(t *testing.T) { msgs = append(msgs, msg) } // create 100 tasks in default queue - h.SeedEnqueuedQueue(t, r.client, msgs, "default") + h.SeedPendingQueue(t, r.client, msgs, "default") msgs = []*base.TaskMessage(nil) // empty list for i := 0; i < 100; i++ { @@ -383,7 +383,7 @@ func TestListEnqueuedPagination(t *testing.T) { msgs = append(msgs, msg) } // create 100 tasks in custom queue - h.SeedEnqueuedQueue(t, r.client, msgs, "custom") + h.SeedPendingQueue(t, r.client, msgs, "custom") tests := []struct { desc string @@ -403,8 +403,8 @@ func TestListEnqueuedPagination(t *testing.T) { } for _, tc := range tests { - got, err := r.ListEnqueued(tc.qname, Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) + got, err := r.ListPending(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err) continue @@ -990,13 +990,13 @@ func TestEnqueueDeadTask(t *testing.T) { s2 := time.Now().Add(-time.Hour).Unix() tests := []struct { - dead map[string][]base.Z - qname string - score int64 - id uuid.UUID - want error // expected return value from calling EnqueueDeadTask - wantDead map[string][]*base.TaskMessage - wantEnqueued map[string][]*base.TaskMessage + dead map[string][]base.Z + qname string + score int64 + id uuid.UUID + want error // expected return value from calling EnqueueDeadTask + wantDead map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { dead: map[string][]base.Z{ @@ -1012,7 +1012,7 @@ func TestEnqueueDeadTask(t *testing.T) { wantDead: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t2}, }, }, @@ -1030,7 +1030,7 @@ func TestEnqueueDeadTask(t *testing.T) { wantDead: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -1052,7 +1052,7 @@ func TestEnqueueDeadTask(t *testing.T) { "default": {t1, t2}, "critical": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {t3}, }, @@ -1069,9 +1069,9 @@ func TestEnqueueDeadTask(t *testing.T) { continue } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } @@ -1094,13 +1094,13 @@ func TestEnqueueRetryTask(t *testing.T) { s1 := time.Now().Add(-5 * time.Minute).Unix() s2 := time.Now().Add(-time.Hour).Unix() tests := []struct { - retry map[string][]base.Z - qname string - score int64 - id uuid.UUID - want error // expected return value from calling EnqueueRetryTask - wantRetry map[string][]*base.TaskMessage - wantEnqueued map[string][]*base.TaskMessage + retry map[string][]base.Z + qname string + score int64 + id uuid.UUID + want error // expected return value from calling EnqueueRetryTask + wantRetry map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { retry: map[string][]base.Z{ @@ -1116,7 +1116,7 @@ func TestEnqueueRetryTask(t *testing.T) { wantRetry: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t2}, }, }, @@ -1134,7 +1134,7 @@ func TestEnqueueRetryTask(t *testing.T) { wantRetry: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -1156,7 +1156,7 @@ func TestEnqueueRetryTask(t *testing.T) { "default": {t1, t2}, "low": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "low": {t3}, }, @@ -1173,9 +1173,9 @@ func TestEnqueueRetryTask(t *testing.T) { continue } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } @@ -1204,7 +1204,7 @@ func TestEnqueueScheduledTask(t *testing.T) { id uuid.UUID want error // expected return value from calling EnqueueScheduledTask wantScheduled map[string][]*base.TaskMessage - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { scheduled: map[string][]base.Z{ @@ -1220,7 +1220,7 @@ func TestEnqueueScheduledTask(t *testing.T) { wantScheduled: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t2}, }, }, @@ -1238,7 +1238,7 @@ func TestEnqueueScheduledTask(t *testing.T) { wantScheduled: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -1260,7 +1260,7 @@ func TestEnqueueScheduledTask(t *testing.T) { "default": {t1, t2}, "notifications": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "notifications": {t3}, }, @@ -1277,9 +1277,9 @@ func TestEnqueueScheduledTask(t *testing.T) { continue } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } @@ -1306,7 +1306,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { scheduled map[string][]base.Z qname string want int64 - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantScheduled map[string][]*base.TaskMessage }{ { @@ -1320,7 +1320,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { }, qname: "default", want: 3, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, wantScheduled: map[string][]*base.TaskMessage{ @@ -1334,7 +1334,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { }, qname: "default", want: 0, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantScheduled: map[string][]*base.TaskMessage{ @@ -1356,7 +1356,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { }, qname: "custom", want: 2, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {t4, t5}, }, @@ -1383,9 +1383,9 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { tc.desc, tc.qname, got, err, tc.want) } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) } } @@ -1407,12 +1407,12 @@ func TestEnqueueAllRetryTasks(t *testing.T) { t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom") tests := []struct { - desc string - retry map[string][]base.Z - qname string - want int64 - wantEnqueued map[string][]*base.TaskMessage - wantRetry map[string][]*base.TaskMessage + desc string + retry map[string][]base.Z + qname string + want int64 + wantPending map[string][]*base.TaskMessage + wantRetry map[string][]*base.TaskMessage }{ { desc: "with tasks in retry queue", @@ -1425,7 +1425,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) { }, qname: "default", want: 3, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, wantRetry: map[string][]*base.TaskMessage{ @@ -1439,7 +1439,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) { }, qname: "default", want: 0, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantRetry: map[string][]*base.TaskMessage{ @@ -1461,7 +1461,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) { }, qname: "custom", want: 2, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {t4, t5}, }, @@ -1488,9 +1488,9 @@ func TestEnqueueAllRetryTasks(t *testing.T) { tc.desc, tc.qname, got, err, tc.want) } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) } } @@ -1512,12 +1512,12 @@ func TestEnqueueAllDeadTasks(t *testing.T) { t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom") tests := []struct { - desc string - dead map[string][]base.Z - qname string - want int64 - wantEnqueued map[string][]*base.TaskMessage - wantDead map[string][]*base.TaskMessage + desc string + dead map[string][]base.Z + qname string + want int64 + wantPending map[string][]*base.TaskMessage + wantDead map[string][]*base.TaskMessage }{ { desc: "with tasks in dead queue", @@ -1530,7 +1530,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) { }, qname: "default", want: 3, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, wantDead: map[string][]*base.TaskMessage{ @@ -1544,7 +1544,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) { }, qname: "default", want: 0, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantDead: map[string][]*base.TaskMessage{ @@ -1566,7 +1566,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) { }, qname: "custom", want: 2, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {t4, t5}, }, @@ -1593,9 +1593,9 @@ func TestEnqueueAllDeadTasks(t *testing.T) { tc.desc, tc.qname, got, err, tc.want) } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) } } @@ -2608,7 +2608,7 @@ func TestRemoveQueue(t *testing.T) { m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") tests := []struct { - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z @@ -2617,7 +2617,7 @@ func TestRemoveQueue(t *testing.T) { force bool }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, }, @@ -2641,7 +2641,7 @@ func TestRemoveQueue(t *testing.T) { force: false, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3}, }, @@ -2668,7 +2668,7 @@ func TestRemoveQueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllInProgressQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) @@ -2709,7 +2709,7 @@ func TestRemoveQueueError(t *testing.T) { tests := []struct { desc string - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z @@ -2719,7 +2719,7 @@ func TestRemoveQueueError(t *testing.T) { }{ { desc: "removing non-existent queue", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3}, }, @@ -2744,7 +2744,7 @@ func TestRemoveQueueError(t *testing.T) { }, { desc: "removing non-empty queue", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3}, }, @@ -2769,7 +2769,7 @@ func TestRemoveQueueError(t *testing.T) { }, { desc: "force removing queue with tasks in-progress", - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3}, }, @@ -2797,7 +2797,7 @@ func TestRemoveQueueError(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllInProgressQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) @@ -2810,9 +2810,9 @@ func TestRemoveQueueError(t *testing.T) { } // Make sure that nothing changed - for qname, want := range tc.enqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.pending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff) } } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 9d85220..d3c2af6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -80,12 +80,12 @@ func TestEnqueue(t *testing.T) { t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err) } - gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue) - if len(gotEnqueued) != 1 { - t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotEnqueued)) + gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue) + if len(gotPending) != 1 { + t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotPending)) continue } - if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" { + if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) } if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { @@ -167,24 +167,24 @@ func TestDequeue(t *testing.T) { t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest tests := []struct { - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage args []string // list of queues to query wantMsg *base.TaskMessage wantDeadline time.Time err error - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantInProgress map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, }, args: []string{"default"}, wantMsg: t1, wantDeadline: time.Unix(t1Deadline, 0), err: nil, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantInProgress: map[string][]*base.TaskMessage{ @@ -195,14 +195,14 @@ func TestDequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, }, args: []string{"default"}, wantMsg: nil, wantDeadline: time.Time{}, err: ErrNoProcessableTask, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantInProgress: map[string][]*base.TaskMessage{ @@ -213,7 +213,7 @@ func TestDequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t2}, "low": {t3}, @@ -222,7 +222,7 @@ func TestDequeue(t *testing.T) { wantMsg: t2, wantDeadline: time.Unix(t2Deadline, 0), err: nil, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, "low": {t3}, @@ -239,7 +239,7 @@ func TestDequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t3}, "critical": {}, "low": {t2, t1}, @@ -248,7 +248,7 @@ func TestDequeue(t *testing.T) { wantMsg: t3, wantDeadline: time.Unix(t3Deadline, 0), err: nil, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {t2, t1}, @@ -265,7 +265,7 @@ func TestDequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -274,7 +274,7 @@ func TestDequeue(t *testing.T) { wantMsg: nil, wantDeadline: time.Time{}, err: ErrNoProcessableTask, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -294,7 +294,7 @@ func TestDequeue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) gotMsg, gotDeadline, err := r.Dequeue(tc.args...) if err != tc.err { @@ -313,9 +313,9 @@ func TestDequeue(t *testing.T) { continue } - for queue, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for queue, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } @@ -355,23 +355,23 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { tests := []struct { paused []string // list of paused queues - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage args []string // list of queues to query wantMsg *base.TaskMessage err error - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantInProgress map[string][]*base.TaskMessage }{ { paused: []string{"default"}, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t2}, }, args: []string{"default", "critical"}, wantMsg: t2, err: nil, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, }, @@ -382,13 +382,13 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { }, { paused: []string{"default"}, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, }, args: []string{"default"}, wantMsg: nil, err: ErrNoProcessableTask, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, }, wantInProgress: map[string][]*base.TaskMessage{ @@ -397,14 +397,14 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { }, { paused: []string{"critical", "default"}, - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t2}, }, args: []string{"default", "critical"}, wantMsg: nil, err: ErrNoProcessableTask, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t2}, }, @@ -422,7 +422,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t.Fatal(err) } } - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) got, _, err := r.Dequeue(tc.args...) if !cmp.Equal(got, tc.wantMsg) || err != tc.err { @@ -431,9 +431,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { continue } - for queue, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for queue, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } @@ -627,16 +627,16 @@ func TestRequeue(t *testing.T) { t3Deadline := now.Unix() + t3.Timeout tests := []struct { - enqueued map[string][]*base.TaskMessage // initial state of queues + pending map[string][]*base.TaskMessage // initial state of queues inProgress map[string][]*base.TaskMessage // initial state of the in-progress list deadlines map[string][]base.Z // initial state of the deadlines set target *base.TaskMessage // task to requeue - wantEnqueued map[string][]*base.TaskMessage // final state of queues + wantPending map[string][]*base.TaskMessage // final state of queues wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list wantDeadlines map[string][]base.Z // final state of the deadlines set }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {}, }, inProgress: map[string][]*base.TaskMessage{ @@ -649,7 +649,7 @@ func TestRequeue(t *testing.T) { }, }, target: t1, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, }, wantInProgress: map[string][]*base.TaskMessage{ @@ -662,7 +662,7 @@ func TestRequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, }, inProgress: map[string][]*base.TaskMessage{ @@ -674,7 +674,7 @@ func TestRequeue(t *testing.T) { }, }, target: t2, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, wantInProgress: map[string][]*base.TaskMessage{ @@ -685,7 +685,7 @@ func TestRequeue(t *testing.T) { }, }, { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, }, @@ -698,7 +698,7 @@ func TestRequeue(t *testing.T) { "critical": {{Message: t3, Score: t3Deadline}}, }, target: t3, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t3}, }, @@ -715,7 +715,7 @@ func TestRequeue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued) + h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllInProgressQueues(t, r.client, tc.inProgress) h.SeedAllDeadlines(t, r.client, tc.deadlines) @@ -725,9 +725,9 @@ func TestRequeue(t *testing.T) { continue } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } @@ -1215,7 +1215,7 @@ func TestCheckAndEnqueue(t *testing.T) { scheduled map[string][]base.Z retry map[string][]base.Z qnames []string - wantEnqueued map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage wantScheduled map[string][]*base.TaskMessage wantRetry map[string][]*base.TaskMessage }{ @@ -1230,7 +1230,7 @@ func TestCheckAndEnqueue(t *testing.T) { "default": {{Message: t3, Score: secondAgo.Unix()}}, }, qnames: []string{"default"}, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, wantScheduled: map[string][]*base.TaskMessage{ @@ -1251,7 +1251,7 @@ func TestCheckAndEnqueue(t *testing.T) { "default": {{Message: t3, Score: secondAgo.Unix()}}, }, qnames: []string{"default"}, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, wantScheduled: map[string][]*base.TaskMessage{ @@ -1272,7 +1272,7 @@ func TestCheckAndEnqueue(t *testing.T) { "default": {{Message: t3, Score: hourFromNow.Unix()}}, }, qnames: []string{"default"}, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {}, }, wantScheduled: map[string][]*base.TaskMessage{ @@ -1294,7 +1294,7 @@ func TestCheckAndEnqueue(t *testing.T) { "low": {{Message: t5, Score: secondAgo.Unix()}}, }, qnames: []string{"default", "critical", "low"}, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t4}, "low": {t5}, @@ -1323,9 +1323,9 @@ func TestCheckAndEnqueue(t *testing.T) { continue } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } diff --git a/processor_test.go b/processor_test.go index 603a1bd..7ae498c 100644 --- a/processor_test.go +++ b/processor_test.go @@ -57,25 +57,25 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { t4 := NewTask(m4.Type, m4.Payload) tests := []struct { - enqueued []*base.TaskMessage // initial default queue state + pending []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run wantProcessed []*Task // tasks to be processed at the end }{ { - enqueued: []*base.TaskMessage{m1}, + pending: []*base.TaskMessage{m1}, incoming: []*base.TaskMessage{m2, m3, m4}, wantProcessed: []*Task{t1, t2, t3, t4}, }, { - enqueued: []*base.TaskMessage{}, + pending: []*base.TaskMessage{}, incoming: []*base.TaskMessage{m1}, wantProcessed: []*Task{t1}, }, } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue. // instantiate a new processor var mu sync.Mutex @@ -117,7 +117,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { t.Fatal(err) } } - time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed. + time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed. if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } @@ -148,12 +148,12 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { ) tests := []struct { - enqueued map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage queues []string // list of queues to consume the tasks from wantProcessed []*Task // tasks to be processed at the end }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "high": {m3}, "low": {m4}, @@ -166,7 +166,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { for _, tc := range tests { // Set up test case. h.FlushDB(t, r) - h.SeedAllEnqueuedQueues(t, r, tc.enqueued) + h.SeedAllPendingQueues(t, r, tc.pending) // Instantiate a new processor. var mu sync.Mutex @@ -205,7 +205,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { p.handler = HandlerFunc(handler) p.start(&sync.WaitGroup{}) - // Wait for two second to allow all enqueued tasks to be processed. + // Wait for two second to allow all pending tasks to be processed. time.Sleep(2 * time.Second) // Make sure no messages are stuck in in-progress list. for _, qname := range tc.queues { @@ -232,18 +232,18 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { t1 := NewTask(m1.Type, m1.Payload) tests := []struct { - enqueued []*base.TaskMessage // initial default queue state + pending []*base.TaskMessage // initial default queue state wantProcessed []*Task // tasks to be processed at the end }{ { - enqueued: []*base.TaskMessage{m1}, + pending: []*base.TaskMessage{m1}, wantProcessed: []*Task{t1}, }, } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue. var mu sync.Mutex var processed []*Task @@ -282,7 +282,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p.handler = HandlerFunc(handler) p.start(&sync.WaitGroup{}) - time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed. + time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed. if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) } @@ -310,7 +310,7 @@ func TestProcessorRetry(t *testing.T) { now := time.Now() tests := []struct { - enqueued []*base.TaskMessage // initial default queue state + pending []*base.TaskMessage // initial default queue state incoming []*base.TaskMessage // tasks to be enqueued during run delay time.Duration // retry delay duration handler Handler // task handler @@ -320,7 +320,7 @@ func TestProcessorRetry(t *testing.T) { wantErrCount int // number of times error handler should be called }{ { - enqueued: []*base.TaskMessage{m1, m2}, + pending: []*base.TaskMessage{m1, m2}, incoming: []*base.TaskMessage{m3, m4}, delay: time.Minute, handler: HandlerFunc(func(ctx context.Context, task *Task) error { @@ -338,8 +338,8 @@ func TestProcessorRetry(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue. + h.FlushDB(t, r) // clean up db before each test case. + h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue. // instantiate a new processor delayFunc := func(n int, e error, t *Task) time.Duration { @@ -486,13 +486,13 @@ func TestProcessorWithStrictPriority(t *testing.T) { ) tests := []struct { - enqueued map[string][]*base.TaskMessage // initial queues state + pending map[string][]*base.TaskMessage // initial queues state queues []string // list of queues to consume tasks from wait time.Duration // wait duration between starting and stopping processor for this test case wantProcessed []*Task // tasks to be processed at the end }{ { - enqueued: map[string][]*base.TaskMessage{ + pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m4, m5}, "critical": {m1, m2, m3}, "low": {m6, m7}, @@ -505,8 +505,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - for qname, msgs := range tc.enqueued { - h.SeedEnqueuedQueue(t, r, msgs, qname) + for qname, msgs := range tc.pending { + h.SeedPendingQueue(t, r, msgs, qname) } // instantiate a new processor diff --git a/scheduler_test.go b/scheduler_test.go index cf3391c..f637da4 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -34,11 +34,11 @@ func TestScheduler(t *testing.T) { tests := []struct { initScheduled map[string][]base.Z // scheduled queue initial state initRetry map[string][]base.Z // retry queue initial state - initEnqueued map[string][]*base.TaskMessage // default queue initial state + initPending map[string][]*base.TaskMessage // default queue initial state wait time.Duration // wait duration before checking for final state wantScheduled map[string][]*base.TaskMessage // schedule queue final state wantRetry map[string][]*base.TaskMessage // retry queue final state - wantEnqueued map[string][]*base.TaskMessage // default queue final state + wantPending map[string][]*base.TaskMessage // default queue final state }{ { initScheduled: map[string][]base.Z{ @@ -49,7 +49,7 @@ func TestScheduler(t *testing.T) { "default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}}, "critical": {}, }, - initEnqueued: map[string][]*base.TaskMessage{ + initPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {t4}, }, @@ -62,7 +62,7 @@ func TestScheduler(t *testing.T) { "default": {}, "critical": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t3}, "critical": {t2, t4}, }, @@ -81,7 +81,7 @@ func TestScheduler(t *testing.T) { "default": {}, "critical": {}, }, - initEnqueued: map[string][]*base.TaskMessage{ + initPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {t4}, }, @@ -94,7 +94,7 @@ func TestScheduler(t *testing.T) { "default": {}, "critical": {}, }, - wantEnqueued: map[string][]*base.TaskMessage{ + wantPending: map[string][]*base.TaskMessage{ "default": {t1, t3}, "critical": {t2, t4}, }, @@ -105,7 +105,7 @@ func TestScheduler(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue - h.SeedAllEnqueuedQueues(t, r, tc.initEnqueued) // initialize default queue + h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue var wg sync.WaitGroup s.start(&wg) @@ -126,9 +126,9 @@ func TestScheduler(t *testing.T) { } } - for qname, want := range tc.wantEnqueued { - gotEnqueued := h.GetEnqueuedMessages(t, r, qname) - if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff) } } diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index d04f86e..61a3b96 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -145,9 +145,9 @@ func printQueueStats(s *asynq.QueueStats) { fmt.Printf("Paused: %t\n\n", s.Paused) fmt.Println("Task Breakdown:") printTable( - []string{"InProgress", "Enqueued", "Scheduled", "Retry", "Dead"}, + []string{"InProgress", "Pending", "Scheduled", "Retry", "Dead"}, func(w io.Writer, tmpl string) { - fmt.Fprintf(w, tmpl, s.InProgress, s.Enqueued, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(w, tmpl, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead) }, ) fmt.Println() diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index 55c0358..3103b35 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -52,7 +52,7 @@ func init() { type AggregateStats struct { InProgress int - Enqueued int + Pending int Scheduled int Retry int Dead int @@ -79,7 +79,7 @@ func stats(cmd *cobra.Command, args []string) { os.Exit(1) } aggStats.InProgress += s.InProgress - aggStats.Enqueued += s.Enqueued + aggStats.Pending += s.Pending aggStats.Scheduled += s.Scheduled aggStats.Retry += s.Retry aggStats.Dead += s.Dead @@ -113,9 +113,9 @@ func stats(cmd *cobra.Command, args []string) { func printStatsByState(s *AggregateStats) { format := strings.Repeat("%v\t", 5) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead") + fmt.Fprintf(tw, format, "InProgress", "Pending", "Scheduled", "Retry", "Dead") fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----") - fmt.Fprintf(tw, format, s.InProgress, s.Enqueued, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(tw, format, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead) tw.Flush() } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 6e85367..69d0b91 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -75,7 +75,7 @@ var taskListCmd = &cobra.Command{ The value for the state flag should be one of: - in-progress -- enqueued +- pending - scheduled - retry - dead @@ -85,11 +85,11 @@ By default, the command fetches the first 30 tasks. Use --page and --size flags to specify the page number and size. Example: -To list enqueued tasks from "default" queue, run - asynq task ls --queue=default --state=enqueued +To list pending tasks from "default" queue, run + asynq task ls --queue=default --state=pending To list the tasks from the second page, run - asynq task ls --queue=default --state=enqueued --page=1`, + asynq task ls --queue=default --state=pending --page=1`, Run: taskList, } @@ -167,8 +167,8 @@ func taskList(cmd *cobra.Command, args []string) { switch state { case "in-progress": listInProgressTasks(qname, pageNum, pageSize) - case "enqueued": - listEnqueuedTasks(qname, pageNum, pageSize) + case "pending": + listPendingTasks(qname, pageNum, pageSize) case "scheduled": listScheduledTasks(qname, pageNum, pageSize) case "retry": @@ -202,15 +202,15 @@ func listInProgressTasks(qname string, pageNum, pageSize int) { ) } -func listEnqueuedTasks(qname string, pageNum, pageSize int) { +func listPendingTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) } if len(tasks) == 0 { - fmt.Printf("No enqueued tasks in %q queue\n", qname) + fmt.Printf("No pending tasks in %q queue\n", qname) return } printTable(