mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Rename Enqueued to Pending
This commit is contained in:
parent
cca680a7fd
commit
c8c47fcbf0
@ -132,6 +132,7 @@ func NewImageProcessor() *ImageProcessor {
|
||||
```
|
||||
|
||||
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue.
|
||||
// TODO: This description needs to be updated.
|
||||
A task will be processed asynchronously by a background worker as soon as the task gets enqueued.
|
||||
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.
|
||||
|
||||
|
@ -32,7 +32,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
processAt time.Time // value for ProcessAt option
|
||||
opts []Option // other options
|
||||
wantRes *Result
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
@ -47,7 +47,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -75,7 +75,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantScheduled: map[string][]base.Z{
|
||||
@ -114,9 +114,9 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -141,7 +141,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
task *Task
|
||||
opts []Option
|
||||
wantRes *Result
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
desc: "Process task immediately with a custom retry count",
|
||||
@ -156,7 +156,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -182,7 +182,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -209,7 +209,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -235,7 +235,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"custom": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -261,7 +261,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"high": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -287,7 +287,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: 20 * time.Second,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -313,7 +313,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: noTimeout,
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -340,7 +340,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout: 20 * time.Second,
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -372,8 +372,8 @@ func TestClientEnqueue(t *testing.T) {
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
got := h.GetEnqueuedMessages(t, r, qname)
|
||||
for qname, want := range tc.wantPending {
|
||||
got := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
@ -394,11 +394,11 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
delay time.Duration // value for ProcessIn option
|
||||
opts []Option // other options
|
||||
wantRes *Result
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
desc: "schedule a task to be enqueued in one hour",
|
||||
desc: "schedule a task to be processed in one hour",
|
||||
task: task,
|
||||
delay: 1 * time.Hour,
|
||||
opts: []Option{},
|
||||
@ -409,7 +409,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantScheduled: map[string][]base.Z{
|
||||
@ -440,7 +440,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
{
|
||||
Type: task.Type,
|
||||
@ -476,9 +476,9 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -619,15 +619,15 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
}
|
||||
enqueued := h.GetEnqueuedMessages(t, r, tc.queue)
|
||||
if len(enqueued) != 1 {
|
||||
pending := h.GetPendingMessages(t, r, tc.queue)
|
||||
if len(pending) != 1 {
|
||||
t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.",
|
||||
tc.desc, tc.queue, len(enqueued))
|
||||
tc.desc, tc.queue, len(pending))
|
||||
continue
|
||||
}
|
||||
got := enqueued[0]
|
||||
got := pending[0]
|
||||
if diff := cmp.Diff(tc.want, got, h.IgnoreIDOpt); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in enqueued task message; (-want,+got)\n%s",
|
||||
t.Errorf("%s;\nmismatch found in pending task message; (-want,+got)\n%s",
|
||||
tc.desc, diff)
|
||||
}
|
||||
}
|
||||
|
22
inspector.go
22
inspector.go
@ -37,10 +37,10 @@ type QueueStats struct {
|
||||
// Name of the queue.
|
||||
Queue string
|
||||
// Size is the total number of tasks in the queue.
|
||||
// The value is the sum of Enqueued, InProgress, Scheduled, Retry, and Dead.
|
||||
// The value is the sum of Pending, InProgress, Scheduled, Retry, and Dead.
|
||||
Size int
|
||||
// Number of enqueued tasks.
|
||||
Enqueued int
|
||||
// Number of pending tasks.
|
||||
Pending int
|
||||
// Number of in-progress tasks.
|
||||
InProgress int
|
||||
// Number of scheduled tasks.
|
||||
@ -73,7 +73,7 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
|
||||
return &QueueStats{
|
||||
Queue: stats.Queue,
|
||||
Size: stats.Size,
|
||||
Enqueued: stats.Enqueued,
|
||||
Pending: stats.Pending,
|
||||
InProgress: stats.InProgress,
|
||||
Scheduled: stats.Scheduled,
|
||||
Retry: stats.Retry,
|
||||
@ -119,8 +119,8 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// EnqueuedTask is a task in a queue and is ready to be processed.
|
||||
type EnqueuedTask struct {
|
||||
// PendingTask is a task in a queue and is ready to be processed.
|
||||
type PendingTask struct {
|
||||
*Task
|
||||
ID string
|
||||
Queue string
|
||||
@ -269,22 +269,22 @@ func Page(n int) ListOption {
|
||||
return pageNumOpt(n)
|
||||
}
|
||||
|
||||
// ListEnqueuedTasks retrieves enqueued tasks from the specified queue.
|
||||
// ListPendingTasks retrieves pending tasks from the specified queue.
|
||||
//
|
||||
// By default, it retrieves the first 30 tasks.
|
||||
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
|
||||
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opt := composeListOptions(opts...)
|
||||
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
|
||||
msgs, err := i.rdb.ListEnqueued(qname, pgn)
|
||||
msgs, err := i.rdb.ListPending(qname, pgn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*EnqueuedTask
|
||||
var tasks []*PendingTask
|
||||
for _, m := range msgs {
|
||||
tasks = append(tasks, &EnqueuedTask{
|
||||
tasks = append(tasks, &PendingTask{
|
||||
Task: NewTask(m.Type, m.Payload),
|
||||
ID: m.ID.String(),
|
||||
Queue: m.Queue,
|
||||
|
@ -63,7 +63,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
||||
inspector := NewInspector(getRedisConnOpt(t))
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
@ -74,7 +74,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
||||
want *QueueStats
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m5},
|
||||
"low": {m6},
|
||||
@ -116,7 +116,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
||||
want: &QueueStats{
|
||||
Queue: "default",
|
||||
Size: 4,
|
||||
Enqueued: 1,
|
||||
Pending: 1,
|
||||
InProgress: 1,
|
||||
Scheduled: 2,
|
||||
Retry: 0,
|
||||
@ -131,7 +131,7 @@ func TestInspectorCurrentStats(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress)
|
||||
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
|
||||
@ -215,15 +215,15 @@ func TestInspectorHistory(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func createEnqueuedTask(msg *base.TaskMessage) *EnqueuedTask {
|
||||
return &EnqueuedTask{
|
||||
func createPendingTask(msg *base.TaskMessage) *PendingTask {
|
||||
return &PendingTask{
|
||||
Task: NewTask(msg.Type, msg.Payload),
|
||||
ID: msg.ID.String(),
|
||||
Queue: msg.Queue,
|
||||
}
|
||||
}
|
||||
|
||||
func TestInspectorListEnqueuedTasks(t *testing.T) {
|
||||
func TestInspectorListPendingTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||
@ -234,58 +234,58 @@ func TestInspectorListEnqueuedTasks(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
want []*EnqueuedTask
|
||||
want []*PendingTask
|
||||
}{
|
||||
{
|
||||
desc: "with default queue",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
},
|
||||
qname: "default",
|
||||
want: []*EnqueuedTask{
|
||||
createEnqueuedTask(m1),
|
||||
createEnqueuedTask(m2),
|
||||
want: []*PendingTask{
|
||||
createPendingTask(m1),
|
||||
createPendingTask(m2),
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with named queue",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"critical": {m3},
|
||||
"low": {m4},
|
||||
},
|
||||
qname: "critical",
|
||||
want: []*EnqueuedTask{
|
||||
createEnqueuedTask(m3),
|
||||
want: []*PendingTask{
|
||||
createPendingTask(m3),
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with empty queue",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
qname: "default",
|
||||
want: []*EnqueuedTask(nil),
|
||||
want: []*PendingTask(nil),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
for q, msgs := range tc.enqueued {
|
||||
asynqtest.SeedEnqueuedQueue(t, r, msgs, q)
|
||||
for q, msgs := range tc.pending {
|
||||
asynqtest.SeedPendingQueue(t, r, msgs, q)
|
||||
}
|
||||
|
||||
got, err := inspector.ListEnqueuedTasks(tc.qname)
|
||||
got, err := inspector.ListPendingTasks(tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s; ListEnqueuedTasks(%q) returned error: %v",
|
||||
t.Errorf("%s; ListPendingTasks(%q) returned error: %v",
|
||||
tc.desc, tc.qname, err)
|
||||
continue
|
||||
}
|
||||
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
|
||||
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
|
||||
t.Errorf("%s; ListEnqueuedTasks(%q) = %v, want %v; (-want,+got)\n%s",
|
||||
t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, tc.qname, got, tc.want, diff)
|
||||
}
|
||||
}
|
||||
@ -576,53 +576,53 @@ func TestInspectorListPagination(t *testing.T) {
|
||||
asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil))
|
||||
}
|
||||
r := setup(t)
|
||||
asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName)
|
||||
asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName)
|
||||
|
||||
inspector := NewInspector(getRedisConnOpt(t))
|
||||
|
||||
tests := []struct {
|
||||
page int
|
||||
pageSize int
|
||||
want []*EnqueuedTask
|
||||
want []*PendingTask
|
||||
}{
|
||||
{
|
||||
page: 1,
|
||||
pageSize: 5,
|
||||
want: []*EnqueuedTask{
|
||||
createEnqueuedTask(msgs[0]),
|
||||
createEnqueuedTask(msgs[1]),
|
||||
createEnqueuedTask(msgs[2]),
|
||||
createEnqueuedTask(msgs[3]),
|
||||
createEnqueuedTask(msgs[4]),
|
||||
want: []*PendingTask{
|
||||
createPendingTask(msgs[0]),
|
||||
createPendingTask(msgs[1]),
|
||||
createPendingTask(msgs[2]),
|
||||
createPendingTask(msgs[3]),
|
||||
createPendingTask(msgs[4]),
|
||||
},
|
||||
},
|
||||
{
|
||||
page: 3,
|
||||
pageSize: 10,
|
||||
want: []*EnqueuedTask{
|
||||
createEnqueuedTask(msgs[20]),
|
||||
createEnqueuedTask(msgs[21]),
|
||||
createEnqueuedTask(msgs[22]),
|
||||
createEnqueuedTask(msgs[23]),
|
||||
createEnqueuedTask(msgs[24]),
|
||||
createEnqueuedTask(msgs[25]),
|
||||
createEnqueuedTask(msgs[26]),
|
||||
createEnqueuedTask(msgs[27]),
|
||||
createEnqueuedTask(msgs[28]),
|
||||
createEnqueuedTask(msgs[29]),
|
||||
want: []*PendingTask{
|
||||
createPendingTask(msgs[20]),
|
||||
createPendingTask(msgs[21]),
|
||||
createPendingTask(msgs[22]),
|
||||
createPendingTask(msgs[23]),
|
||||
createPendingTask(msgs[24]),
|
||||
createPendingTask(msgs[25]),
|
||||
createPendingTask(msgs[26]),
|
||||
createPendingTask(msgs[27]),
|
||||
createPendingTask(msgs[28]),
|
||||
createPendingTask(msgs[29]),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := inspector.ListEnqueuedTasks("default", Page(tc.page), PageSize(tc.pageSize))
|
||||
got, err := inspector.ListPendingTasks("default", Page(tc.page), PageSize(tc.pageSize))
|
||||
if err != nil {
|
||||
t.Errorf("ListEnqueuedTask('default') returned error: %v", err)
|
||||
t.Errorf("ListPendingTask('default') returned error: %v", err)
|
||||
continue
|
||||
}
|
||||
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
|
||||
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
|
||||
t.Errorf("ListEnqueuedTask('default') = %v, want %v; (-want,+got)\n%s",
|
||||
t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s",
|
||||
got, tc.want, diff)
|
||||
}
|
||||
}
|
||||
@ -1084,11 +1084,11 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
want int
|
||||
wantScheduled map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
@ -1096,7 +1096,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1108,7 +1108,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1120,7 +1120,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m4},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1132,7 +1132,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m4, m1},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1142,7 +1142,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
qname: "default",
|
||||
@ -1150,7 +1150,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
wantScheduled: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
},
|
||||
@ -1159,7 +1159,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
got, err := inspector.EnqueueAllScheduledTasks(tc.qname)
|
||||
if err != nil {
|
||||
@ -1175,10 +1175,10 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
|
||||
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1200,11 +1200,11 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
retry map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
want int
|
||||
wantRetry map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
retry: map[string][]base.Z{
|
||||
@ -1212,7 +1212,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1224,7 +1224,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1236,7 +1236,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m4},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1248,7 +1248,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m4, m1},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1258,7 +1258,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
retry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
qname: "default",
|
||||
@ -1266,7 +1266,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
wantRetry: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
},
|
||||
@ -1275,7 +1275,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
got, err := inspector.EnqueueAllRetryTasks(tc.qname)
|
||||
if err != nil {
|
||||
@ -1291,10 +1291,10 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
|
||||
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1316,11 +1316,11 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
want int
|
||||
wantDead map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
@ -1328,7 +1328,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1340,7 +1340,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1351,7 +1351,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
"default": {z1},
|
||||
"critical": {z2},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m4},
|
||||
"critical": {},
|
||||
},
|
||||
@ -1361,7 +1361,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {z2},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m4, m1},
|
||||
"critical": {},
|
||||
},
|
||||
@ -1370,7 +1370,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
dead: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
qname: "default",
|
||||
@ -1378,7 +1378,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
wantDead: map[string][]base.Z{
|
||||
"default": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m4},
|
||||
},
|
||||
},
|
||||
@ -1387,7 +1387,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
got, err := inspector.EnqueueAllDeadTasks(tc.qname)
|
||||
if err != nil {
|
||||
@ -1404,10 +1404,10 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1574,18 +1574,18 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
scheduled map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
key string
|
||||
wantScheduled map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {z1, z2},
|
||||
"custom": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@ -1595,7 +1595,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
|
||||
"default": {z1},
|
||||
"custom": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {m2},
|
||||
"custom": {},
|
||||
},
|
||||
@ -1605,7 +1605,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
|
||||
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
||||
@ -1619,10 +1619,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s",
|
||||
qname, diff)
|
||||
}
|
||||
}
|
||||
@ -1643,18 +1643,18 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
retry map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
key string
|
||||
wantRetry map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
retry: map[string][]base.Z{
|
||||
"default": {z1},
|
||||
"custom": {z2, z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
@ -1664,7 +1664,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
|
||||
"default": {z1},
|
||||
"custom": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {m2},
|
||||
},
|
||||
@ -1674,7 +1674,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
|
||||
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
||||
@ -1687,10 +1687,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
|
||||
qname, diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s",
|
||||
qname, diff)
|
||||
}
|
||||
}
|
||||
@ -1711,11 +1711,11 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
dead map[string][]base.Z
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
key string
|
||||
wantDead map[string][]base.Z
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
@ -1723,7 +1723,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
|
||||
"critical": {z2},
|
||||
"low": {z3},
|
||||
},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -1735,7 +1735,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
|
||||
"critical": {},
|
||||
"low": {z3},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {m2},
|
||||
"low": {},
|
||||
@ -1746,7 +1746,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
asynqtest.FlushDB(t, r)
|
||||
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
||||
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
|
||||
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
||||
@ -1759,10 +1759,10 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
|
||||
qname, diff)
|
||||
}
|
||||
}
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := asynqtest.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" {
|
||||
t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s",
|
||||
qname, diff)
|
||||
}
|
||||
}
|
||||
|
@ -174,8 +174,8 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) {
|
||||
}
|
||||
}
|
||||
|
||||
// SeedEnqueuedQueue initializes the specified queue with the given messages.
|
||||
func SeedEnqueuedQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
||||
// SeedPendingQueue initializes the specified queue with the given messages.
|
||||
func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
|
||||
tb.Helper()
|
||||
r.SAdd(base.AllQueues, qname)
|
||||
seedRedisList(tb, r, base.QueueKey(qname), msgs)
|
||||
@ -216,12 +216,12 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna
|
||||
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries)
|
||||
}
|
||||
|
||||
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
|
||||
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
||||
//
|
||||
// enqueued maps a queue name to a list of messages.
|
||||
func SeedAllEnqueuedQueues(tb testing.TB, r redis.UniversalClient, enqueued map[string][]*base.TaskMessage) {
|
||||
for q, msgs := range enqueued {
|
||||
SeedEnqueuedQueue(tb, r, msgs, q)
|
||||
// pending maps a queue name to a list of messages.
|
||||
func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[string][]*base.TaskMessage) {
|
||||
for q, msgs := range pending {
|
||||
SeedPendingQueue(tb, r, msgs, q)
|
||||
}
|
||||
}
|
||||
|
||||
@ -278,8 +278,8 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b
|
||||
}
|
||||
}
|
||||
|
||||
// GetEnqueuedMessages returns all enqueued messages in the given queue.
|
||||
func GetEnqueuedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
// GetPendingMessages returns all pending messages in the given queue.
|
||||
func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||
tb.Helper()
|
||||
return getListMessages(tb, r, base.QueueKey(qname))
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ type Stats struct {
|
||||
// Size is the total number of tasks in the queue.
|
||||
Size int
|
||||
// Number of tasks in each state.
|
||||
Enqueued int
|
||||
Pending int
|
||||
InProgress int
|
||||
Scheduled int
|
||||
Retry int
|
||||
@ -133,7 +133,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
|
||||
val := cast.ToInt(data[i+1])
|
||||
switch key {
|
||||
case base.QueueKey(qname):
|
||||
stats.Enqueued = val
|
||||
stats.Pending = val
|
||||
size += val
|
||||
case base.InProgressKey(qname):
|
||||
stats.InProgress = val
|
||||
@ -258,8 +258,8 @@ func (p Pagination) stop() int64 {
|
||||
return int64(p.Size*p.Page + p.Size - 1)
|
||||
}
|
||||
|
||||
// ListEnqueued returns enqueued tasks that are ready to be processed.
|
||||
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
|
||||
// ListPending returns pending tasks that are ready to be processed.
|
||||
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
|
||||
if !r.client.SIsMember(base.AllQueues, qname).Val() {
|
||||
return nil, fmt.Errorf("queue %q does not exist", qname)
|
||||
}
|
||||
@ -680,12 +680,12 @@ return redis.status_reply("OK")`)
|
||||
// KEYS[5] -> asynq:{<qname>}:dead
|
||||
// KEYS[6] -> asynq:{<qname>}:deadlines
|
||||
var removeQueueCmd = redis.NewScript(`
|
||||
local enqueued = redis.call("LLEN", KEYS[1])
|
||||
local pending = redis.call("LLEN", KEYS[1])
|
||||
local inprogress = redis.call("LLEN", KEYS[2])
|
||||
local scheduled = redis.call("SCARD", KEYS[3])
|
||||
local retry = redis.call("SCARD", KEYS[4])
|
||||
local dead = redis.call("SCARD", KEYS[5])
|
||||
local total = enqueued + inprogress + scheduled + retry + dead
|
||||
local total = pending + inprogress + scheduled + retry + dead
|
||||
if total > 0 then
|
||||
return redis.error_reply("QUEUE NOT EMPTY")
|
||||
end
|
||||
|
@ -57,7 +57,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
@ -69,7 +69,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
want *Stats
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m5},
|
||||
"low": {m6},
|
||||
@ -113,7 +113,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
Queue: "default",
|
||||
Paused: false,
|
||||
Size: 4,
|
||||
Enqueued: 1,
|
||||
Pending: 1,
|
||||
InProgress: 1,
|
||||
Scheduled: 2,
|
||||
Retry: 0,
|
||||
@ -124,7 +124,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"critical": {m5},
|
||||
"low": {m6},
|
||||
@ -168,7 +168,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
Queue: "critical",
|
||||
Paused: true,
|
||||
Size: 1,
|
||||
Enqueued: 1,
|
||||
Pending: 1,
|
||||
InProgress: 0,
|
||||
Scheduled: 0,
|
||||
Retry: 0,
|
||||
@ -187,7 +187,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
@ -303,7 +303,7 @@ func TestRedisInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListEnqueued(t *testing.T) {
|
||||
func TestListPending(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
|
||||
@ -312,26 +312,26 @@ func TestListEnqueued(t *testing.T) {
|
||||
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
qname string
|
||||
want []*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {m1, m2},
|
||||
},
|
||||
qname: base.DefaultQueueName,
|
||||
want: []*base.TaskMessage{m1, m2},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: nil,
|
||||
},
|
||||
qname: base.DefaultQueueName,
|
||||
want: []*base.TaskMessage(nil),
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {m1, m2},
|
||||
"critical": {m3},
|
||||
"low": {m4},
|
||||
@ -340,7 +340,7 @@ func TestListEnqueued(t *testing.T) {
|
||||
want: []*base.TaskMessage{m1, m2},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {m1, m2},
|
||||
"critical": {m3},
|
||||
"low": {m4},
|
||||
@ -352,10 +352,10 @@ func TestListEnqueued(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
|
||||
got, err := r.ListEnqueued(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
got, err := r.ListPending(tc.qname, Pagination{Size: 20, Page: 0})
|
||||
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: 20, Page: 0})", tc.qname)
|
||||
if err != nil {
|
||||
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
|
||||
continue
|
||||
@ -367,7 +367,7 @@ func TestListEnqueued(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestListEnqueuedPagination(t *testing.T) {
|
||||
func TestListPendingPagination(t *testing.T) {
|
||||
r := setup(t)
|
||||
var msgs []*base.TaskMessage
|
||||
for i := 0; i < 100; i++ {
|
||||
@ -375,7 +375,7 @@ func TestListEnqueuedPagination(t *testing.T) {
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
// create 100 tasks in default queue
|
||||
h.SeedEnqueuedQueue(t, r.client, msgs, "default")
|
||||
h.SeedPendingQueue(t, r.client, msgs, "default")
|
||||
|
||||
msgs = []*base.TaskMessage(nil) // empty list
|
||||
for i := 0; i < 100; i++ {
|
||||
@ -383,7 +383,7 @@ func TestListEnqueuedPagination(t *testing.T) {
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
// create 100 tasks in custom queue
|
||||
h.SeedEnqueuedQueue(t, r.client, msgs, "custom")
|
||||
h.SeedPendingQueue(t, r.client, msgs, "custom")
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
@ -403,8 +403,8 @@ func TestListEnqueuedPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.ListEnqueued(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListEnqueued(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
got, err := r.ListPending(tc.qname, Pagination{Size: tc.size, Page: tc.page})
|
||||
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
|
||||
if err != nil {
|
||||
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
|
||||
continue
|
||||
@ -996,7 +996,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
||||
id uuid.UUID
|
||||
want error // expected return value from calling EnqueueDeadTask
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
dead: map[string][]base.Z{
|
||||
@ -1012,7 +1012,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t2},
|
||||
},
|
||||
},
|
||||
@ -1030,7 +1030,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@ -1052,7 +1052,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
||||
"default": {t1, t2},
|
||||
"critical": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {t3},
|
||||
},
|
||||
@ -1069,9 +1069,9 @@ func TestEnqueueDeadTask(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1100,7 +1100,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
||||
id uuid.UUID
|
||||
want error // expected return value from calling EnqueueRetryTask
|
||||
wantRetry map[string][]*base.TaskMessage
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
retry: map[string][]base.Z{
|
||||
@ -1116,7 +1116,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
||||
wantRetry: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t2},
|
||||
},
|
||||
},
|
||||
@ -1134,7 +1134,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
||||
wantRetry: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@ -1156,7 +1156,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
||||
"default": {t1, t2},
|
||||
"low": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"low": {t3},
|
||||
},
|
||||
@ -1173,9 +1173,9 @@ func TestEnqueueRetryTask(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1204,7 +1204,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
||||
id uuid.UUID
|
||||
want error // expected return value from calling EnqueueScheduledTask
|
||||
wantScheduled map[string][]*base.TaskMessage
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
scheduled: map[string][]base.Z{
|
||||
@ -1220,7 +1220,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t2},
|
||||
},
|
||||
},
|
||||
@ -1238,7 +1238,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
},
|
||||
@ -1260,7 +1260,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
||||
"default": {t1, t2},
|
||||
"notifications": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"notifications": {t3},
|
||||
},
|
||||
@ -1277,9 +1277,9 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1306,7 +1306,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||
scheduled map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
@ -1320,7 +1320,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 3,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
@ -1334,7 +1334,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 0,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
@ -1356,7 +1356,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||
},
|
||||
qname: "custom",
|
||||
want: 2,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {t4, t5},
|
||||
},
|
||||
@ -1383,9 +1383,9 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
|
||||
tc.desc, tc.qname, got, err, tc.want)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1411,7 +1411,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||
retry map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantRetry map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
@ -1425,7 +1425,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 3,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
wantRetry: map[string][]*base.TaskMessage{
|
||||
@ -1439,7 +1439,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 0,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantRetry: map[string][]*base.TaskMessage{
|
||||
@ -1461,7 +1461,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||
},
|
||||
qname: "custom",
|
||||
want: 2,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {t4, t5},
|
||||
},
|
||||
@ -1488,9 +1488,9 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
|
||||
tc.desc, tc.qname, got, err, tc.want)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1516,7 +1516,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||
dead map[string][]base.Z
|
||||
qname string
|
||||
want int64
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantDead map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
@ -1530,7 +1530,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 3,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
@ -1544,7 +1544,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||
},
|
||||
qname: "default",
|
||||
want: 0,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantDead: map[string][]*base.TaskMessage{
|
||||
@ -1566,7 +1566,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||
},
|
||||
qname: "custom",
|
||||
want: 2,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {t4, t5},
|
||||
},
|
||||
@ -1593,9 +1593,9 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
|
||||
tc.desc, tc.qname, got, err, tc.want)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -2608,7 +2608,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
@ -2617,7 +2617,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
force bool
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {},
|
||||
},
|
||||
@ -2641,7 +2641,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
force: false,
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {m3},
|
||||
},
|
||||
@ -2668,7 +2668,7 @@ func TestRemoveQueue(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
@ -2709,7 +2709,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
inProgress map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
@ -2719,7 +2719,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
}{
|
||||
{
|
||||
desc: "removing non-existent queue",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {m3},
|
||||
},
|
||||
@ -2744,7 +2744,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
},
|
||||
{
|
||||
desc: "removing non-empty queue",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {m3},
|
||||
},
|
||||
@ -2769,7 +2769,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
},
|
||||
{
|
||||
desc: "force removing queue with tasks in-progress",
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {m3},
|
||||
},
|
||||
@ -2797,7 +2797,7 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, tc.retry)
|
||||
@ -2810,9 +2810,9 @@ func TestRemoveQueueError(t *testing.T) {
|
||||
}
|
||||
|
||||
// Make sure that nothing changed
|
||||
for qname, want := range tc.enqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.pending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
|
@ -80,12 +80,12 @@ func TestEnqueue(t *testing.T) {
|
||||
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||
}
|
||||
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue)
|
||||
if len(gotEnqueued) != 1 {
|
||||
t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotEnqueued))
|
||||
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
|
||||
if len(gotPending) != 1 {
|
||||
t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotPending))
|
||||
continue
|
||||
}
|
||||
if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" {
|
||||
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
|
||||
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
||||
}
|
||||
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
||||
@ -167,24 +167,24 @@ func TestDequeue(t *testing.T) {
|
||||
t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
args []string // list of queues to query
|
||||
wantMsg *base.TaskMessage
|
||||
wantDeadline time.Time
|
||||
err error
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantInProgress map[string][]*base.TaskMessage
|
||||
wantDeadlines map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
args: []string{"default"},
|
||||
wantMsg: t1,
|
||||
wantDeadline: time.Unix(t1Deadline, 0),
|
||||
err: nil,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantInProgress: map[string][]*base.TaskMessage{
|
||||
@ -195,14 +195,14 @@ func TestDequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
args: []string{"default"},
|
||||
wantMsg: nil,
|
||||
wantDeadline: time.Time{},
|
||||
err: ErrNoProcessableTask,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantInProgress: map[string][]*base.TaskMessage{
|
||||
@ -213,7 +213,7 @@ func TestDequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t2},
|
||||
"low": {t3},
|
||||
@ -222,7 +222,7 @@ func TestDequeue(t *testing.T) {
|
||||
wantMsg: t2,
|
||||
wantDeadline: time.Unix(t2Deadline, 0),
|
||||
err: nil,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {},
|
||||
"low": {t3},
|
||||
@ -239,7 +239,7 @@ func TestDequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t3},
|
||||
"critical": {},
|
||||
"low": {t2, t1},
|
||||
@ -248,7 +248,7 @@ func TestDequeue(t *testing.T) {
|
||||
wantMsg: t3,
|
||||
wantDeadline: time.Unix(t3Deadline, 0),
|
||||
err: nil,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {t2, t1},
|
||||
@ -265,7 +265,7 @@ func TestDequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -274,7 +274,7 @@ func TestDequeue(t *testing.T) {
|
||||
wantMsg: nil,
|
||||
wantDeadline: time.Time{},
|
||||
err: ErrNoProcessableTask,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
@ -294,7 +294,7 @@ func TestDequeue(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
|
||||
gotMsg, gotDeadline, err := r.Dequeue(tc.args...)
|
||||
if err != tc.err {
|
||||
@ -313,9 +313,9 @@ func TestDequeue(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for queue, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for queue, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
|
||||
}
|
||||
}
|
||||
@ -355,23 +355,23 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
paused []string // list of paused queues
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
args []string // list of queues to query
|
||||
wantMsg *base.TaskMessage
|
||||
err error
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantInProgress map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
paused: []string{"default"},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t2},
|
||||
},
|
||||
args: []string{"default", "critical"},
|
||||
wantMsg: t2,
|
||||
err: nil,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {},
|
||||
},
|
||||
@ -382,13 +382,13 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||
},
|
||||
{
|
||||
paused: []string{"default"},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
args: []string{"default"},
|
||||
wantMsg: nil,
|
||||
err: ErrNoProcessableTask,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantInProgress: map[string][]*base.TaskMessage{
|
||||
@ -397,14 +397,14 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||
},
|
||||
{
|
||||
paused: []string{"critical", "default"},
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t2},
|
||||
},
|
||||
args: []string{"default", "critical"},
|
||||
wantMsg: nil,
|
||||
err: ErrNoProcessableTask,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t2},
|
||||
},
|
||||
@ -422,7 +422,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
|
||||
got, _, err := r.Dequeue(tc.args...)
|
||||
if !cmp.Equal(got, tc.wantMsg) || err != tc.err {
|
||||
@ -431,9 +431,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for queue, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for queue, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, queue)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
|
||||
}
|
||||
}
|
||||
@ -627,16 +627,16 @@ func TestRequeue(t *testing.T) {
|
||||
t3Deadline := now.Unix() + t3.Timeout
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage // initial state of queues
|
||||
pending map[string][]*base.TaskMessage // initial state of queues
|
||||
inProgress map[string][]*base.TaskMessage // initial state of the in-progress list
|
||||
deadlines map[string][]base.Z // initial state of the deadlines set
|
||||
target *base.TaskMessage // task to requeue
|
||||
wantEnqueued map[string][]*base.TaskMessage // final state of queues
|
||||
wantPending map[string][]*base.TaskMessage // final state of queues
|
||||
wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list
|
||||
wantDeadlines map[string][]base.Z // final state of the deadlines set
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
inProgress: map[string][]*base.TaskMessage{
|
||||
@ -649,7 +649,7 @@ func TestRequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
target: t1,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
wantInProgress: map[string][]*base.TaskMessage{
|
||||
@ -662,7 +662,7 @@ func TestRequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
},
|
||||
inProgress: map[string][]*base.TaskMessage{
|
||||
@ -674,7 +674,7 @@ func TestRequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
target: t2,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2},
|
||||
},
|
||||
wantInProgress: map[string][]*base.TaskMessage{
|
||||
@ -685,7 +685,7 @@ func TestRequeue(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {},
|
||||
},
|
||||
@ -698,7 +698,7 @@ func TestRequeue(t *testing.T) {
|
||||
"critical": {{Message: t3, Score: t3Deadline}},
|
||||
},
|
||||
target: t3,
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t3},
|
||||
},
|
||||
@ -715,7 +715,7 @@ func TestRequeue(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client) // clean up db before each test case
|
||||
h.SeedAllEnqueuedQueues(t, r.client, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
|
||||
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
||||
|
||||
@ -725,9 +725,9 @@ func TestRequeue(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
@ -1215,7 +1215,7 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
qnames []string
|
||||
wantEnqueued map[string][]*base.TaskMessage
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]*base.TaskMessage
|
||||
wantRetry map[string][]*base.TaskMessage
|
||||
}{
|
||||
@ -1230,7 +1230,7 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
"default": {{Message: t3, Score: secondAgo.Unix()}},
|
||||
},
|
||||
qnames: []string{"default"},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t2, t3},
|
||||
},
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
@ -1251,7 +1251,7 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
"default": {{Message: t3, Score: secondAgo.Unix()}},
|
||||
},
|
||||
qnames: []string{"default"},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t2, t3},
|
||||
},
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
@ -1272,7 +1272,7 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
"default": {{Message: t3, Score: hourFromNow.Unix()}},
|
||||
},
|
||||
qnames: []string{"default"},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
},
|
||||
wantScheduled: map[string][]*base.TaskMessage{
|
||||
@ -1294,7 +1294,7 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
"low": {{Message: t5, Score: secondAgo.Unix()}},
|
||||
},
|
||||
qnames: []string{"default", "critical", "low"},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1},
|
||||
"critical": {t4},
|
||||
"low": {t5},
|
||||
@ -1323,9 +1323,9 @@ func TestCheckAndEnqueue(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
|
@ -57,17 +57,17 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
t4 := NewTask(m4.Type, m4.Payload)
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*base.TaskMessage // initial default queue state
|
||||
pending []*base.TaskMessage // initial default queue state
|
||||
incoming []*base.TaskMessage // tasks to be enqueued during run
|
||||
wantProcessed []*Task // tasks to be processed at the end
|
||||
}{
|
||||
{
|
||||
enqueued: []*base.TaskMessage{m1},
|
||||
pending: []*base.TaskMessage{m1},
|
||||
incoming: []*base.TaskMessage{m2, m3, m4},
|
||||
wantProcessed: []*Task{t1, t2, t3, t4},
|
||||
},
|
||||
{
|
||||
enqueued: []*base.TaskMessage{},
|
||||
pending: []*base.TaskMessage{},
|
||||
incoming: []*base.TaskMessage{m1},
|
||||
wantProcessed: []*Task{t1},
|
||||
},
|
||||
@ -75,7 +75,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||
h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue.
|
||||
|
||||
// instantiate a new processor
|
||||
var mu sync.Mutex
|
||||
@ -117,7 +117,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
||||
time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed.
|
||||
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||
}
|
||||
@ -148,12 +148,12 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
queues []string // list of queues to consume the tasks from
|
||||
wantProcessed []*Task // tasks to be processed at the end
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"high": {m3},
|
||||
"low": {m4},
|
||||
@ -166,7 +166,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
// Set up test case.
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllEnqueuedQueues(t, r, tc.enqueued)
|
||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||
|
||||
// Instantiate a new processor.
|
||||
var mu sync.Mutex
|
||||
@ -205,7 +205,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
p.handler = HandlerFunc(handler)
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
// Wait for two second to allow all enqueued tasks to be processed.
|
||||
// Wait for two second to allow all pending tasks to be processed.
|
||||
time.Sleep(2 * time.Second)
|
||||
// Make sure no messages are stuck in in-progress list.
|
||||
for _, qname := range tc.queues {
|
||||
@ -232,18 +232,18 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
t1 := NewTask(m1.Type, m1.Payload)
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*base.TaskMessage // initial default queue state
|
||||
pending []*base.TaskMessage // initial default queue state
|
||||
wantProcessed []*Task // tasks to be processed at the end
|
||||
}{
|
||||
{
|
||||
enqueued: []*base.TaskMessage{m1},
|
||||
pending: []*base.TaskMessage{m1},
|
||||
wantProcessed: []*Task{t1},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||
h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue.
|
||||
|
||||
var mu sync.Mutex
|
||||
var processed []*Task
|
||||
@ -282,7 +282,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
p.handler = HandlerFunc(handler)
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
||||
time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed.
|
||||
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||
}
|
||||
@ -310,7 +310,7 @@ func TestProcessorRetry(t *testing.T) {
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*base.TaskMessage // initial default queue state
|
||||
pending []*base.TaskMessage // initial default queue state
|
||||
incoming []*base.TaskMessage // tasks to be enqueued during run
|
||||
delay time.Duration // retry delay duration
|
||||
handler Handler // task handler
|
||||
@ -320,7 +320,7 @@ func TestProcessorRetry(t *testing.T) {
|
||||
wantErrCount int // number of times error handler should be called
|
||||
}{
|
||||
{
|
||||
enqueued: []*base.TaskMessage{m1, m2},
|
||||
pending: []*base.TaskMessage{m1, m2},
|
||||
incoming: []*base.TaskMessage{m3, m4},
|
||||
delay: time.Minute,
|
||||
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||
@ -339,7 +339,7 @@ func TestProcessorRetry(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||
h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName) // initialize default queue.
|
||||
|
||||
// instantiate a new processor
|
||||
delayFunc := func(n int, e error, t *Task) time.Duration {
|
||||
@ -486,13 +486,13 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
)
|
||||
|
||||
tests := []struct {
|
||||
enqueued map[string][]*base.TaskMessage // initial queues state
|
||||
pending map[string][]*base.TaskMessage // initial queues state
|
||||
queues []string // list of queues to consume tasks from
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantProcessed []*Task // tasks to be processed at the end
|
||||
}{
|
||||
{
|
||||
enqueued: map[string][]*base.TaskMessage{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
base.DefaultQueueName: {m4, m5},
|
||||
"critical": {m1, m2, m3},
|
||||
"low": {m6, m7},
|
||||
@ -505,8 +505,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
for qname, msgs := range tc.enqueued {
|
||||
h.SeedEnqueuedQueue(t, r, msgs, qname)
|
||||
for qname, msgs := range tc.pending {
|
||||
h.SeedPendingQueue(t, r, msgs, qname)
|
||||
}
|
||||
|
||||
// instantiate a new processor
|
||||
|
@ -34,11 +34,11 @@ func TestScheduler(t *testing.T) {
|
||||
tests := []struct {
|
||||
initScheduled map[string][]base.Z // scheduled queue initial state
|
||||
initRetry map[string][]base.Z // retry queue initial state
|
||||
initEnqueued map[string][]*base.TaskMessage // default queue initial state
|
||||
initPending map[string][]*base.TaskMessage // default queue initial state
|
||||
wait time.Duration // wait duration before checking for final state
|
||||
wantScheduled map[string][]*base.TaskMessage // schedule queue final state
|
||||
wantRetry map[string][]*base.TaskMessage // retry queue final state
|
||||
wantEnqueued map[string][]*base.TaskMessage // default queue final state
|
||||
wantPending map[string][]*base.TaskMessage // default queue final state
|
||||
}{
|
||||
{
|
||||
initScheduled: map[string][]base.Z{
|
||||
@ -49,7 +49,7 @@ func TestScheduler(t *testing.T) {
|
||||
"default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}},
|
||||
"critical": {},
|
||||
},
|
||||
initEnqueued: map[string][]*base.TaskMessage{
|
||||
initPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {t4},
|
||||
},
|
||||
@ -62,7 +62,7 @@ func TestScheduler(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t3},
|
||||
"critical": {t2, t4},
|
||||
},
|
||||
@ -81,7 +81,7 @@ func TestScheduler(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
initEnqueued: map[string][]*base.TaskMessage{
|
||||
initPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"critical": {t4},
|
||||
},
|
||||
@ -94,7 +94,7 @@ func TestScheduler(t *testing.T) {
|
||||
"default": {},
|
||||
"critical": {},
|
||||
},
|
||||
wantEnqueued: map[string][]*base.TaskMessage{
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {t1, t3},
|
||||
"critical": {t2, t4},
|
||||
},
|
||||
@ -105,7 +105,7 @@ func TestScheduler(t *testing.T) {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue
|
||||
h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue
|
||||
h.SeedAllEnqueuedQueues(t, r, tc.initEnqueued) // initialize default queue
|
||||
h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue
|
||||
|
||||
var wg sync.WaitGroup
|
||||
s.start(&wg)
|
||||
@ -126,9 +126,9 @@ func TestScheduler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantEnqueued {
|
||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||
}
|
||||
}
|
||||
|
@ -145,9 +145,9 @@ func printQueueStats(s *asynq.QueueStats) {
|
||||
fmt.Printf("Paused: %t\n\n", s.Paused)
|
||||
fmt.Println("Task Breakdown:")
|
||||
printTable(
|
||||
[]string{"InProgress", "Enqueued", "Scheduled", "Retry", "Dead"},
|
||||
[]string{"InProgress", "Pending", "Scheduled", "Retry", "Dead"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
fmt.Fprintf(w, tmpl, s.InProgress, s.Enqueued, s.Scheduled, s.Retry, s.Dead)
|
||||
fmt.Fprintf(w, tmpl, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead)
|
||||
},
|
||||
)
|
||||
fmt.Println()
|
||||
|
@ -52,7 +52,7 @@ func init() {
|
||||
|
||||
type AggregateStats struct {
|
||||
InProgress int
|
||||
Enqueued int
|
||||
Pending int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
@ -79,7 +79,7 @@ func stats(cmd *cobra.Command, args []string) {
|
||||
os.Exit(1)
|
||||
}
|
||||
aggStats.InProgress += s.InProgress
|
||||
aggStats.Enqueued += s.Enqueued
|
||||
aggStats.Pending += s.Pending
|
||||
aggStats.Scheduled += s.Scheduled
|
||||
aggStats.Retry += s.Retry
|
||||
aggStats.Dead += s.Dead
|
||||
@ -113,9 +113,9 @@ func stats(cmd *cobra.Command, args []string) {
|
||||
func printStatsByState(s *AggregateStats) {
|
||||
format := strings.Repeat("%v\t", 5) + "\n"
|
||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead")
|
||||
fmt.Fprintf(tw, format, "InProgress", "Pending", "Scheduled", "Retry", "Dead")
|
||||
fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----")
|
||||
fmt.Fprintf(tw, format, s.InProgress, s.Enqueued, s.Scheduled, s.Retry, s.Dead)
|
||||
fmt.Fprintf(tw, format, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead)
|
||||
tw.Flush()
|
||||
}
|
||||
|
||||
|
@ -75,7 +75,7 @@ var taskListCmd = &cobra.Command{
|
||||
|
||||
The value for the state flag should be one of:
|
||||
- in-progress
|
||||
- enqueued
|
||||
- pending
|
||||
- scheduled
|
||||
- retry
|
||||
- dead
|
||||
@ -85,11 +85,11 @@ By default, the command fetches the first 30 tasks.
|
||||
Use --page and --size flags to specify the page number and size.
|
||||
|
||||
Example:
|
||||
To list enqueued tasks from "default" queue, run
|
||||
asynq task ls --queue=default --state=enqueued
|
||||
To list pending tasks from "default" queue, run
|
||||
asynq task ls --queue=default --state=pending
|
||||
|
||||
To list the tasks from the second page, run
|
||||
asynq task ls --queue=default --state=enqueued --page=1`,
|
||||
asynq task ls --queue=default --state=pending --page=1`,
|
||||
Run: taskList,
|
||||
}
|
||||
|
||||
@ -167,8 +167,8 @@ func taskList(cmd *cobra.Command, args []string) {
|
||||
switch state {
|
||||
case "in-progress":
|
||||
listInProgressTasks(qname, pageNum, pageSize)
|
||||
case "enqueued":
|
||||
listEnqueuedTasks(qname, pageNum, pageSize)
|
||||
case "pending":
|
||||
listPendingTasks(qname, pageNum, pageSize)
|
||||
case "scheduled":
|
||||
listScheduledTasks(qname, pageNum, pageSize)
|
||||
case "retry":
|
||||
@ -202,15 +202,15 @@ func listInProgressTasks(qname string, pageNum, pageSize int) {
|
||||
)
|
||||
}
|
||||
|
||||
func listEnqueuedTasks(qname string, pageNum, pageSize int) {
|
||||
func listPendingTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if len(tasks) == 0 {
|
||||
fmt.Printf("No enqueued tasks in %q queue\n", qname)
|
||||
fmt.Printf("No pending tasks in %q queue\n", qname)
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
|
Loading…
Reference in New Issue
Block a user