2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00
asynq/inspector_test.go

1944 lines
51 KiB
Go
Raw Normal View History

2020-07-13 21:29:41 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
2020-07-27 04:16:13 +08:00
"math"
2020-07-13 21:29:41 +08:00
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/asynqtest"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func TestInspectorCurrentStats(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
m4 := asynqtest.NewTaskMessage("task4", nil)
m5 := asynqtest.NewTaskMessageWithQueue("task5", nil, "critical")
m6 := h.NewTaskMessageWithQueue("task6", nil, "low")
now := time.Now()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
processed map[string]int
failed map[string]int
qname string
2020-07-13 21:29:41 +08:00
want *Stats
}{
{
enqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {m1},
"critical": {m5},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
},
scheduled: map[string][]base.Z{
"default": {
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()},
},
"critical": {},
"low": {},
},
2020-08-18 21:01:38 +08:00
retry: map[string][]base.Z{
2020-08-17 05:51:56 +08:00
"default": {},
"critical": {},
"low": {},
},
2020-08-18 21:01:38 +08:00
dead: map[string][]base.Z{
2020-08-17 05:51:56 +08:00
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
"low": 42,
},
failed: map[string]int{
"default": 2,
"critical": 0,
"low": 5,
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: &Stats{
2020-08-17 05:51:56 +08:00
Queue: "default",
Enqueued: 1,
2020-07-13 21:29:41 +08:00
InProgress: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
2020-08-17 05:51:56 +08:00
Paused: false,
2020-07-13 21:29:41 +08:00
Timestamp: now,
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress)
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
2020-08-18 21:01:38 +08:00
r.Set(processedKey, n, 0)
2020-08-17 05:51:56 +08:00
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
2020-08-18 21:01:38 +08:00
r.Set(failedKey, n, 0)
2020-08-17 05:51:56 +08:00
}
got, err := inspector.CurrentStats(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil",
tc.qname, got, err, tc.want)
2020-07-13 21:29:41 +08:00
continue
}
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s",
tc.qname, got, err, tc.want, diff)
2020-07-13 21:29:41 +08:00
continue
}
}
}
func TestInspectorHistory(t *testing.T) {
r := setup(t)
now := time.Now().UTC()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
qname string // queue of interest
n int // number of days
2020-07-13 21:29:41 +08:00
}{
2020-08-17 05:51:56 +08:00
{"default", 90},
{"custom", 7},
{"default", 0},
2020-07-13 21:29:41 +08:00
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)
2020-08-17 05:51:56 +08:00
processedKey := base.ProcessedKey(tc.qname, ts)
failedKey := base.FailedKey(tc.qname, ts)
2020-07-13 21:29:41 +08:00
r.Set(processedKey, (i+1)*1000, 0)
r.Set(failedKey, (i+1)*10, 0)
}
2020-08-17 05:51:56 +08:00
got, err := inspector.History(tc.qname, tc.n)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("Inspector.History(%q, %d) returned error: %v", tc.qname, tc.n, err)
2020-07-13 21:29:41 +08:00
continue
}
if len(got) != tc.n {
2020-08-17 05:51:56 +08:00
t.Errorf("Inspector.History(%q, %d) returned %d daily stats, want %d",
tc.qname, tc.n, len(got), tc.n)
2020-07-13 21:29:41 +08:00
continue
}
for i := 0; i < tc.n; i++ {
want := &DailyStats{
2020-08-17 05:51:56 +08:00
Queue: tc.qname,
2020-07-13 21:29:41 +08:00
Processed: (i + 1) * 1000,
Failed: (i + 1) * 10,
Date: now.Add(-time.Duration(i) * 24 * time.Hour),
}
if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" {
t.Errorf("Inspector.History %d days ago data; got %+v, want %+v; (-want,+got):\n%s",
i, got[i], want, diff)
}
}
}
}
func createEnqueuedTask(msg *base.TaskMessage) *EnqueuedTask {
return &EnqueuedTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
}
}
func TestInspectorListEnqueuedTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
m4 := asynqtest.NewTaskMessage("task4", nil)
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
desc string
enqueued map[string][]*base.TaskMessage
qname string
want []*EnqueuedTask
}{
{
desc: "with default queue",
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
qname: "default",
want: []*EnqueuedTask{
createEnqueuedTask(m1),
createEnqueuedTask(m2),
},
},
{
desc: "with named queue",
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m2},
"critical": {m3},
"low": {m4},
},
qname: "critical",
want: []*EnqueuedTask{
createEnqueuedTask(m3),
},
},
{
desc: "with empty queue",
enqueued: map[string][]*base.TaskMessage{
"default": {},
},
qname: "default",
want: []*EnqueuedTask(nil),
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
for q, msgs := range tc.enqueued {
asynqtest.SeedEnqueuedQueue(t, r, msgs, q)
}
got, err := inspector.ListEnqueuedTasks(tc.qname)
if err != nil {
t.Errorf("%s; ListEnqueuedTasks(%q) returned error: %v",
tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListEnqueuedTasks(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
}
}
func TestInspectorListInProgressTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
2020-08-17 05:51:56 +08:00
m3 := asynqtest.NewTaskMessage("task3", nil)
m4 := asynqtest.NewTaskMessage("task4", nil)
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
createInProgressTask := func(msg *base.TaskMessage) *InProgressTask {
return &InProgressTask{
2020-08-17 05:51:56 +08:00
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
2020-07-13 21:29:41 +08:00
}
}
tests := []struct {
desc string
2020-08-17 05:51:56 +08:00
inProgress map[string][]*base.TaskMessage
qname string
2020-07-13 21:29:41 +08:00
want []*InProgressTask
}{
{
2020-08-17 05:51:56 +08:00
desc: "with a few in-progress tasks",
inProgress: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: []*InProgressTask{
createInProgressTask(m1),
createInProgressTask(m2),
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.ListInProgressTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListInProgressTasks(%q) returned error: %v", tc.qname, tc.desc, err)
2020-07-13 21:29:41 +08:00
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("%s; ListInProgressTask(%q) = %v, want %v; (-want,+got)\n%s",
2020-08-17 05:51:56 +08:00
tc.desc, tc.qname, got, tc.want, diff)
2020-07-13 21:29:41 +08:00
}
}
}
2020-08-17 05:51:56 +08:00
func createScheduledTask(z base.Z) *ScheduledTask {
msg := z.Message
return &ScheduledTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
NextEnqueueAt: time.Unix(z.Score, 0),
2020-08-19 20:45:02 +08:00
score: z.Score,
2020-08-17 05:51:56 +08:00
}
}
2020-07-13 21:29:41 +08:00
func TestInspectorListScheduledTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-18 21:01:38 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
desc string
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
qname string
2020-07-13 21:29:41 +08:00
want []*ScheduledTask
}{
{
2020-08-17 05:51:56 +08:00
desc: "with a few scheduled tasks",
scheduled: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
2020-07-13 21:29:41 +08:00
// Should be sorted by NextEnqueuedAt.
want: []*ScheduledTask{
createScheduledTask(z3),
createScheduledTask(z1),
createScheduledTask(z2),
},
},
{
2020-08-17 05:51:56 +08:00
desc: "with empty scheduled queue",
scheduled: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []*ScheduledTask(nil),
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
2020-07-13 21:29:41 +08:00
2020-08-18 21:01:38 +08:00
got, err := inspector.ListScheduledTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, ScheduledTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
2020-07-13 21:29:41 +08:00
}
}
}
2020-08-17 05:51:56 +08:00
func createRetryTask(z base.Z) *RetryTask {
msg := z.Message
return &RetryTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
NextEnqueueAt: time.Unix(z.Score, 0),
MaxRetry: msg.Retry,
Retried: msg.Retried,
ErrorMsg: msg.ErrorMsg,
2020-08-19 20:45:02 +08:00
score: z.Score,
2020-08-17 05:51:56 +08:00
}
}
2020-07-13 21:29:41 +08:00
func TestInspectorListRetryTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
desc string
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
qname string
2020-07-13 21:29:41 +08:00
want []*RetryTask
}{
{
2020-08-17 05:51:56 +08:00
desc: "with a few retry tasks",
retry: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
2020-07-13 21:29:41 +08:00
// Should be sorted by NextEnqueuedAt.
want: []*RetryTask{
createRetryTask(z3),
createRetryTask(z1),
createRetryTask(z2),
},
},
{
2020-08-17 05:51:56 +08:00
desc: "with empty retry queue",
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: []*RetryTask(nil),
},
2020-08-17 05:51:56 +08:00
// TODO(hibiken): ErrQueueNotFound when queue doesn't exist
2020-07-13 21:29:41 +08:00
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
2020-07-13 21:29:41 +08:00
2020-08-18 21:01:38 +08:00
got, err := inspector.ListRetryTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, RetryTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
2020-07-13 21:29:41 +08:00
}
}
}
2020-08-17 05:51:56 +08:00
func createDeadTask(z base.Z) *DeadTask {
msg := z.Message
return &DeadTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastFailedAt: time.Unix(z.Score, 0),
ErrorMsg: msg.ErrorMsg,
2020-08-19 20:45:02 +08:00
score: z.Score,
2020-08-17 05:51:56 +08:00
}
}
2020-07-13 21:29:41 +08:00
func TestInspectorListDeadTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
desc string
2020-08-17 05:51:56 +08:00
dead map[string][]base.Z
qname string
2020-07-13 21:29:41 +08:00
want []*DeadTask
}{
{
2020-08-17 05:51:56 +08:00
desc: "with a few dead tasks",
dead: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
2020-07-13 21:29:41 +08:00
// Should be sorted by LastFailedAt.
want: []*DeadTask{
createDeadTask(z2),
createDeadTask(z1),
createDeadTask(z3),
},
},
{
2020-08-17 05:51:56 +08:00
desc: "with empty dead queue",
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: []*DeadTask(nil),
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-18 21:01:38 +08:00
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.ListDeadTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListDeadTasks(%q) returned error: %v", tc.desc, tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, DeadTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("%s; ListDeadTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorListPagination(t *testing.T) {
// Create 100 tasks.
var msgs []*base.TaskMessage
for i := 0; i <= 99; i++ {
msgs = append(msgs,
asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil))
}
r := setup(t)
2020-08-18 21:01:38 +08:00
asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName)
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
page int
pageSize int
want []*EnqueuedTask
}{
{
page: 1,
pageSize: 5,
want: []*EnqueuedTask{
createEnqueuedTask(msgs[0]),
createEnqueuedTask(msgs[1]),
createEnqueuedTask(msgs[2]),
createEnqueuedTask(msgs[3]),
createEnqueuedTask(msgs[4]),
},
},
{
page: 3,
pageSize: 10,
want: []*EnqueuedTask{
createEnqueuedTask(msgs[20]),
createEnqueuedTask(msgs[21]),
createEnqueuedTask(msgs[22]),
createEnqueuedTask(msgs[23]),
createEnqueuedTask(msgs[24]),
createEnqueuedTask(msgs[25]),
createEnqueuedTask(msgs[26]),
createEnqueuedTask(msgs[27]),
createEnqueuedTask(msgs[28]),
createEnqueuedTask(msgs[29]),
},
},
}
for _, tc := range tests {
got, err := inspector.ListEnqueuedTasks("default", Page(tc.page), PageSize(tc.pageSize))
if err != nil {
t.Errorf("ListEnqueuedTask('default') returned error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("ListEnqueuedTask('default') = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff)
}
}
}
func TestInspectorDeleteAllScheduledTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-18 21:46:19 +08:00
scheduled map[string][]base.Z
qname string
want int
wantScheduled map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
want: 3,
2020-08-18 21:46:19 +08:00
wantScheduled: map[string][]base.Z{
"default": {},
"custom": {z4},
},
2020-07-13 21:29:41 +08:00
},
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
2020-08-18 21:46:19 +08:00
wantScheduled: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.DeleteAllScheduledTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllScheduledTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-18 21:46:19 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorDeleteAllRetryTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-18 21:46:19 +08:00
retry map[string][]base.Z
qname string
want int
wantRetry map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: 3,
2020-08-18 21:46:19 +08:00
wantRetry: map[string][]base.Z{
"default": {},
"custom": {z4},
},
2020-07-13 21:29:41 +08:00
},
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: 0,
2020-08-18 21:46:19 +08:00
wantRetry: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.DeleteAllRetryTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllRetryTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-18 21:46:19 +08:00
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorDeleteAllDeadTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-18 21:46:19 +08:00
dead map[string][]base.Z
qname string
want int
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
qname: "default",
want: 3,
2020-08-18 21:46:19 +08:00
wantDead: map[string][]base.Z{
"default": {},
"custom": {z4},
},
2020-07-13 21:29:41 +08:00
},
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
2020-08-18 21:46:19 +08:00
wantDead: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.DeleteAllDeadTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("DeleteAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-18 21:46:19 +08:00
for qname, want := range tc.wantDead {
2020-08-19 20:45:02 +08:00
gotDead := asynqtest.GetDeadEntries(t, r, qname)
2020-08-18 21:46:19 +08:00
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorKillAllScheduledTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
dead map[string][]base.Z
qname string
want int
wantScheduled map[string][]base.Z
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "default",
want: 3,
wantScheduled: map[string][]base.Z{
"default": {},
"custom": {z4},
},
wantDead: map[string][]base.Z{
"default": {
base.Z{Message: m1, Score: now.Unix()},
base.Z{Message: m2, Score: now.Unix()},
base.Z{Message: m3, Score: now.Unix()},
},
"custom": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-18 21:01:38 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z2},
},
dead: map[string][]base.Z{
"default": {z3},
},
qname: "default",
want: 2,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
z3,
base.Z{Message: m1, Score: now.Unix()},
base.Z{Message: m2, Score: now.Unix()},
},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-18 21:01:38 +08:00
scheduled: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
},
{
2020-08-18 21:01:38 +08:00
scheduled: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {z1, z2},
},
qname: "default",
want: 0,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {z1, z2},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.KillAllScheduledTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("KillAllScheduledTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("KillAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
// Allow Z.Score to differ by up to 2.
approxOpt := cmp.Comparer(func(a, b int64) bool {
return math.Abs(float64(a-b)) < 2
})
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorKillAllRetryTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessage("task3", nil)
2020-08-17 05:51:56 +08:00
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
dead map[string][]base.Z
qname string
want int
wantRetry map[string][]base.Z
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1, z2, z3},
"custom": {z4},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: 3,
2020-08-17 05:51:56 +08:00
wantRetry: map[string][]base.Z{
"default": {},
"custom": {z4},
},
wantDead: map[string][]base.Z{
"default": {
base.Z{Message: m1, Score: now.Unix()},
base.Z{Message: m2, Score: now.Unix()},
base.Z{Message: m3, Score: now.Unix()},
},
"custom": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1, z2},
},
dead: map[string][]base.Z{
"default": {z3},
},
qname: "default",
2020-07-13 21:29:41 +08:00
want: 2,
2020-08-17 05:51:56 +08:00
wantRetry: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
z3,
base.Z{Message: m1, Score: now.Unix()},
base.Z{Message: m2, Score: now.Unix()},
},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {z1, z2},
},
want: 0,
wantRetry: map[string][]base.Z{
"default": {},
},
2020-08-18 21:01:38 +08:00
wantDead: map[string][]base.Z{
2020-08-17 05:51:56 +08:00
"default": {z1, z2},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.KillAllRetryTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("KillAllRetryTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("KillAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
m4 := asynqtest.NewTaskMessage("task4", nil)
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
enqueued map[string][]*base.TaskMessage
qname string
want int
wantScheduled map[string][]base.Z
wantEnqueued map[string][]*base.TaskMessage
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z4},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 2,
wantScheduled: map[string][]base.Z{
"default": {},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {m1, m4},
"critical": {},
"low": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m4},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 1,
wantScheduled: map[string][]base.Z{
"default": {},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m4, m1},
2020-08-17 05:51:56 +08:00
"critical": {},
"low": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 0,
wantScheduled: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.EnqueueAllScheduledTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllScheduledTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
m4 := asynqtest.NewTaskMessage("task2", nil)
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
2020-07-13 21:29:41 +08:00
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
qname string
2020-07-13 21:29:41 +08:00
want int
2020-08-17 05:51:56 +08:00
wantRetry map[string][]base.Z
2020-07-13 21:29:41 +08:00
wantEnqueued map[string][]*base.TaskMessage
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1, z4},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 2,
wantRetry: map[string][]base.Z{
"default": {},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {m1, m4},
"critical": {},
"low": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m4},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 1,
wantRetry: map[string][]base.Z{
"default": {},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m4, m1},
2020-08-19 20:45:02 +08:00
"critical": {},
"low": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 0,
wantRetry: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.EnqueueAllRetryTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllRetryTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
2020-08-17 05:51:56 +08:00
2020-07-13 21:29:41 +08:00
func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
m4 := asynqtest.NewTaskMessage("task2", nil)
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
2020-08-17 05:51:56 +08:00
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
2020-07-13 21:29:41 +08:00
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
dead map[string][]base.Z
2020-07-13 21:29:41 +08:00
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
qname string
2020-07-13 21:29:41 +08:00
want int
2020-08-17 05:51:56 +08:00
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
wantEnqueued map[string][]*base.TaskMessage
}{
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {z1, z4},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 2,
wantDead: map[string][]base.Z{
"default": {},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {m1, m4},
"critical": {},
"low": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {z1},
"critical": {z2},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m4},
"critical": {},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 1,
wantDead: map[string][]base.Z{
"default": {},
"critical": {z2},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m4, m1},
2020-08-17 05:51:56 +08:00
"critical": {},
2020-07-13 21:29:41 +08:00
},
},
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
2020-08-17 05:51:56 +08:00
qname: "default",
want: 0,
wantDead: map[string][]base.Z{
"default": {},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {m1, m4},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
got, err := inspector.EnqueueAllDeadTasks(tc.qname)
2020-07-13 21:29:41 +08:00
if err != nil {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllDeadTasks(%q) returned error: %v", tc.qname, err)
2020-07-13 21:29:41 +08:00
continue
}
if got != tc.want {
2020-08-17 05:51:56 +08:00
t.Errorf("EnqueueAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
2020-08-17 05:51:56 +08:00
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
qname string
key string
wantScheduled map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
key: createScheduledTask(z2).Key(),
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
2020-08-17 05:51:56 +08:00
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
qname string
key string
wantRetry map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
key: createRetryTask(z2).Key(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantRetry {
2020-08-18 21:46:19 +08:00
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
2020-08-17 05:51:56 +08:00
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
2020-08-17 05:51:56 +08:00
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
dead map[string][]base.Z
qname string
key string
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
qname: "default",
key: createDeadTask(z2).Key(),
wantDead: map[string][]base.Z{
"default": {z1},
"custom": {z3},
},
2020-07-13 21:29:41 +08:00
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-18 21:01:38 +08:00
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
2020-08-18 21:46:19 +08:00
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
2020-08-17 05:51:56 +08:00
m2 := asynqtest.NewTaskMessage("task2", nil)
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
2020-07-13 21:29:41 +08:00
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
qname string
key string
wantScheduled map[string][]base.Z
2020-07-13 21:29:41 +08:00
wantEnqueued map[string][]*base.TaskMessage
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1, z2},
"custom": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {},
"custom": {},
},
qname: "default",
key: createScheduledTask(z2).Key(),
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
2020-07-13 21:29:41 +08:00
},
wantEnqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {m2},
"custom": {},
2020-07-13 21:29:41 +08:00
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
2020-07-13 21:29:41 +08:00
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-08-17 05:51:56 +08:00
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
2020-07-13 21:29:41 +08:00
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
2020-08-17 05:51:56 +08:00
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
2020-07-13 21:29:41 +08:00
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
qname string
key string
wantRetry map[string][]base.Z
2020-07-13 21:29:41 +08:00
wantEnqueued map[string][]*base.TaskMessage
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1},
"custom": {z2, z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {},
"custom": {},
},
qname: "custom",
key: createRetryTask(z2).Key(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
2020-07-13 21:29:41 +08:00
},
wantEnqueued: map[string][]*base.TaskMessage{
2020-08-17 05:51:56 +08:00
"default": {},
"custom": {m2},
2020-07-13 21:29:41 +08:00
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
2020-07-13 21:29:41 +08:00
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-08-17 05:51:56 +08:00
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
2020-08-18 21:46:19 +08:00
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low")
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
dead map[string][]base.Z
2020-07-13 21:29:41 +08:00
enqueued map[string][]*base.TaskMessage
2020-08-17 05:51:56 +08:00
qname string
key string
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
wantEnqueued map[string][]*base.TaskMessage
}{
{
2020-08-17 05:51:56 +08:00
dead: map[string][]base.Z{
"default": {z1},
"critical": {z2},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
2020-08-17 05:51:56 +08:00
qname: "critical",
key: createDeadTask(z2).Key(),
wantDead: map[string][]base.Z{
"default": {z1},
"critical": {},
"low": {z3},
},
2020-07-13 21:29:41 +08:00
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {m2},
"low": {},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
asynqtest.SeedAllEnqueuedQueues(t, r, tc.enqueued)
2020-08-17 05:51:56 +08:00
if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
for qname, want := range tc.wantEnqueued {
gotEnqueued := asynqtest.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, asynqtest.SortMsgOpt); diff != "" {
2020-08-17 05:51:56 +08:00
t.Errorf("unexpected enqueued tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
}
}
func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
2020-08-17 05:51:56 +08:00
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
scheduled map[string][]base.Z
dead map[string][]base.Z
qname string
2020-08-18 21:01:38 +08:00
key string
2020-08-17 05:51:56 +08:00
want string
wantScheduled map[string][]base.Z
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
scheduled: map[string][]base.Z{
"default": {z1},
"custom": {z2, z3},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
key: createScheduledTask(z2).Key(),
2020-08-18 21:01:38 +08:00
wantScheduled: map[string][]base.Z{
2020-08-17 05:51:56 +08:00
"default": {z1},
"custom": {z3},
},
wantDead: map[string][]base.Z{
"default": {},
"custom": {{m2, now.Unix()}},
2020-07-13 21:29:41 +08:00
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil {
2020-08-18 21:46:19 +08:00
t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantScheduled {
gotScheduled := asynqtest.GetScheduledEntries(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
2020-07-13 21:29:41 +08:00
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
}
}
func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
2020-08-17 05:51:56 +08:00
m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom")
2020-07-13 21:29:41 +08:00
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
2020-08-17 05:51:56 +08:00
retry map[string][]base.Z
dead map[string][]base.Z
qname string
key string
wantRetry map[string][]base.Z
wantDead map[string][]base.Z
2020-07-13 21:29:41 +08:00
}{
{
2020-08-17 05:51:56 +08:00
retry: map[string][]base.Z{
"default": {z1},
"custom": {z2, z3},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
key: createRetryTask(z2).Key(),
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
},
2020-08-18 21:01:38 +08:00
wantDead: map[string][]base.Z{
2020-08-17 05:51:56 +08:00
"default": {},
"custom": {{m2, now.Unix()}},
2020-07-13 21:29:41 +08:00
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
2020-08-17 05:51:56 +08:00
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
2020-07-13 21:29:41 +08:00
2020-08-17 05:51:56 +08:00
if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil {
2020-08-18 21:46:19 +08:00
t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
2020-07-13 21:29:41 +08:00
continue
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantRetry {
gotRetry := asynqtest.GetRetryEntries(t, r, qname)
if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-07-13 21:29:41 +08:00
}
}
2020-08-17 05:51:56 +08:00
for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" {
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s",
2020-08-18 21:46:19 +08:00
qname, diff)
2020-08-17 05:51:56 +08:00
}
2020-07-13 21:29:41 +08:00
}
}
}