2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-28 00:23:38 +08:00
asynq/internal/rdb/inspect_test.go

3228 lines
80 KiB
Go

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package rdb
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
func TestAllQueues(t *testing.T) {
r := setup(t)
defer r.Close()
tests := []struct {
queues []string
}{
{queues: []string{"default"}},
{queues: []string{"custom1", "custom2"}},
{queues: []string{"default", "custom1", "custom2"}},
{queues: []string{}},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.queues {
if err := r.client.SAdd(base.AllQueues, qname).Err(); err != nil {
t.Fatalf("could not initialize all queue set: %v", err)
}
}
got, err := r.AllQueues()
if err != nil {
t.Errorf("AllQueues() returned an error: %v", err)
continue
}
if diff := cmp.Diff(tc.queues, got, h.SortStringSliceOpt); diff != "" {
t.Errorf("AllQueues() = %v, want %v; (-want, +got)\n%s", got, tc.queues, diff)
}
}
}
func TestCurrentStats(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"})
m4 := h.NewTaskMessage("sync", nil)
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
now := time.Now()
tests := []struct {
pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
processed map[string]int
failed map[string]int
paused []string
qname string
want *Stats
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m5},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
},
scheduled: map[string][]base.Z{
"default": {
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()},
},
"critical": {},
"low": {},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
dead: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
"low": 50,
},
failed: map[string]int{
"default": 2,
"critical": 0,
"low": 1,
},
paused: []string{},
qname: "default",
want: &Stats{
Queue: "default",
Paused: false,
Size: 4,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Timestamp: now,
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m5},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
},
scheduled: map[string][]base.Z{
"default": {
{Message: m3, Score: now.Add(time.Hour).Unix()},
{Message: m4, Score: now.Unix()},
},
"critical": {},
"low": {},
},
retry: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
dead: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
processed: map[string]int{
"default": 120,
"critical": 100,
"low": 50,
},
failed: map[string]int{
"default": 2,
"critical": 0,
"low": 1,
},
paused: []string{"critical", "low"},
qname: "critical",
want: &Stats{
Queue: "critical",
Paused: true,
Size: 1,
Pending: 1,
Active: 0,
Scheduled: 0,
Retry: 0,
Dead: 0,
Processed: 100,
Failed: 0,
Timestamp: now,
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
for _, qname := range tc.paused {
if err := r.Pause(qname); err != nil {
t.Fatal(err)
}
}
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.client.Set(processedKey, n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
r.client.Set(failedKey, n, 0)
}
got, err := r.CurrentStats(tc.qname)
if err != nil {
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil", tc.qname, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
continue
}
}
}
func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
r := setup(t)
defer r.Close()
qname := "non-existent"
got, err := r.CurrentStats(qname)
if err == nil {
t.Fatalf("r.CurrentStats(%q) = %v, %v, want nil, %v", qname, got, err, &ErrQueueNotFound{qname})
}
}
func TestHistoricalStats(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now().UTC()
tests := []struct {
qname string // queue of interest
n int // number of days
}{
{"default", 90},
{"custom", 7},
{"default", 1},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
r.client.SAdd(base.AllQueues, tc.qname)
// populate last n days data
for i := 0; i < tc.n; i++ {
ts := now.Add(-time.Duration(i) * 24 * time.Hour)
processedKey := base.ProcessedKey(tc.qname, ts)
failedKey := base.FailedKey(tc.qname, ts)
r.client.Set(processedKey, (i+1)*1000, 0)
r.client.Set(failedKey, (i+1)*10, 0)
}
got, err := r.HistoricalStats(tc.qname, tc.n)
if err != nil {
t.Errorf("RDB.HistoricalStats(%q, %d) returned error: %v", tc.qname, tc.n, err)
continue
}
if len(got) != tc.n {
t.Errorf("RDB.HistorycalStats(%q, %d) returned %d daily stats, want %d", tc.qname, tc.n, len(got), tc.n)
continue
}
for i := 0; i < tc.n; i++ {
want := &DailyStats{
Queue: tc.qname,
Processed: (i + 1) * 1000,
Failed: (i + 1) * 10,
Time: now.Add(-time.Duration(i) * 24 * time.Hour),
}
// Allow 2 seconds difference in timestamp.
cmpOpt := cmpopts.EquateApproxTime(2 * time.Second)
if diff := cmp.Diff(want, got[i], cmpOpt); diff != "" {
t.Errorf("RDB.HistoricalStats for the last %d days; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff)
}
}
}
}
func TestRedisInfo(t *testing.T) {
r := setup(t)
defer r.Close()
info, err := r.RedisInfo()
if err != nil {
t.Fatalf("RDB.RedisInfo() returned error: %v", err)
}
wantKeys := []string{
"redis_version",
"uptime_in_days",
"connected_clients",
"used_memory_human",
"used_memory_peak_human",
"used_memory_peak_perc",
}
for _, key := range wantKeys {
if _, ok := info[key]; !ok {
t.Errorf("RDB.RedisInfo() = %v is missing entry for %q", info, key)
}
}
}
func TestListPending(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
tests := []struct {
pending map[string][]*base.TaskMessage
qname string
want []*base.TaskMessage
}{
{
pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
},
qname: base.DefaultQueueName,
want: []*base.TaskMessage{m1, m2},
},
{
pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: nil,
},
qname: base.DefaultQueueName,
want: []*base.TaskMessage(nil),
},
{
pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qname: base.DefaultQueueName,
want: []*base.TaskMessage{m1, m2},
},
{
pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qname: "critical",
want: []*base.TaskMessage{m3},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
got, err := r.ListPending(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListPendingPagination(t *testing.T) {
r := setup(t)
defer r.Close()
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in default queue
h.SeedPendingQueue(t, r.client, msgs, "default")
msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil)
msgs = append(msgs, msg)
}
// create 100 tasks in custom queue
h.SeedPendingQueue(t, r.client, msgs, "custom")
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
{"second page with custom queue", "custom", 1, 20, 20, "custom 20", "custom 39"},
}
for _, tc := range tests {
got, err := r.ListPending(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListPending(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned a list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListActive(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task2", nil, "critical")
m4 := h.NewTaskMessageWithQueue("task2", nil, "low")
tests := []struct {
inProgress map[string][]*base.TaskMessage
qname string
want []*base.TaskMessage
}{
{
inProgress: map[string][]*base.TaskMessage{
"default": {m1, m2},
"critical": {m3},
"low": {m4},
},
qname: "default",
want: []*base.TaskMessage{m1, m2},
},
{
inProgress: map[string][]*base.TaskMessage{
"default": {},
},
qname: "default",
want: []*base.TaskMessage(nil),
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
got, err := r.ListActive(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress)
continue
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListActivePagination(t *testing.T) {
r := setup(t)
defer r.Close()
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
h.SeedActiveQueue(t, r.client, msgs, "default")
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListActive(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0]
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1]
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListScheduled(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessageWithQueue("task3", nil, "custom")
p1 := time.Now().Add(30 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
p3 := time.Now().Add(5 * time.Minute)
p4 := time.Now().Add(2 * time.Minute)
tests := []struct {
scheduled map[string][]base.Z
qname string
want []base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
{Message: m3, Score: p3.Unix()},
},
"custom": {
{Message: m4, Score: p4.Unix()},
},
},
qname: "default",
// should be sorted by score in ascending order
want: []base.Z{
{Message: m3, Score: p3.Unix()},
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
{Message: m3, Score: p3.Unix()},
},
"custom": {
{Message: m4, Score: p4.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m4, Score: p4.Unix()},
},
},
{
scheduled: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil),
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got, err := r.ListScheduled(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
}
}
func TestListScheduledPagination(t *testing.T) {
r := setup(t)
defer r.Close()
// create 100 tasks with an increasing number of wait time.
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Schedule(msg, time.Now().Add(time.Duration(i)*time.Second)); err != nil {
t.Fatal(err)
}
}
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListScheduled(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d", tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0].Message
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1].Message
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListRetry(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
ID: uuid.New(),
Type: "task1",
Queue: "default",
Payload: nil,
ErrorMsg: "some error occurred",
Retry: 25,
Retried: 10,
}
m2 := &base.TaskMessage{
ID: uuid.New(),
Type: "task2",
Queue: "default",
Payload: nil,
ErrorMsg: "some error occurred",
Retry: 25,
Retried: 2,
}
m3 := &base.TaskMessage{
ID: uuid.New(),
Type: "task3",
Queue: "custom",
Payload: nil,
ErrorMsg: "some error occurred",
Retry: 25,
Retried: 3,
}
p1 := time.Now().Add(5 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
p3 := time.Now().Add(24 * time.Hour)
tests := []struct {
retry map[string][]base.Z
qname string
want []base.Z
}{
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
},
"custom": {
{Message: m3, Score: p3.Unix()},
},
},
qname: "default",
want: []base.Z{
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
},
"custom": {
{Message: m3, Score: p3.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: p3.Unix()},
},
},
{
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil),
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry)
got, err := r.ListRetry(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue
}
}
}
func TestListRetryPagination(t *testing.T) {
r := setup(t)
defer r.Close()
// create 100 tasks with an increasing number of wait time.
now := time.Now()
var seed []base.Z
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
processAt := now.Add(time.Duration(i) * time.Second)
seed = append(seed, base.Z{Message: msg, Score: processAt.Unix()})
}
h.SeedRetryQueue(t, r.client, seed, "default")
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListRetry(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: %d, Page: %d})",
tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d",
tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0].Message
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1].Message
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListDead(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
ID: uuid.New(),
Type: "task1",
Queue: "default",
Payload: nil,
ErrorMsg: "some error occurred",
}
m2 := &base.TaskMessage{
ID: uuid.New(),
Type: "task2",
Queue: "default",
Payload: nil,
ErrorMsg: "some error occurred",
}
m3 := &base.TaskMessage{
ID: uuid.New(),
Type: "task3",
Queue: "custom",
Payload: nil,
ErrorMsg: "some error occurred",
}
f1 := time.Now().Add(-5 * time.Minute)
f2 := time.Now().Add(-24 * time.Hour)
f3 := time.Now().Add(-4 * time.Hour)
tests := []struct {
dead map[string][]base.Z
qname string
want []base.Z
}{
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: f1.Unix()},
{Message: m2, Score: f2.Unix()},
},
"custom": {
{Message: m3, Score: f3.Unix()},
},
},
qname: "default",
want: []base.Z{
{Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order?
{Message: m1, Score: f1.Unix()},
},
},
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: f1.Unix()},
{Message: m2, Score: f2.Unix()},
},
"custom": {
{Message: m3, Score: f3.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: f3.Unix()},
},
},
{
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil),
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.ListDead(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue
}
}
}
func TestListDeadPagination(t *testing.T) {
r := setup(t)
defer r.Close()
var entries []base.Z
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
entries = append(entries, base.Z{Message: msg, Score: int64(i)})
}
h.SeedDeadQueue(t, r.client, entries, "default")
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListDead(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})",
tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d",
tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0].Message
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1].Message
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
var (
timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time
zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score
)
func TestRunDeadTask(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessageWithQueue("send_notification", nil, "critical")
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
tests := []struct {
dead map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunDeadTask
wantDead map[string][]*base.TaskMessage
wantPending map[string][]*base.TaskMessage
}{
{
dead: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantDead: map[string][]*base.TaskMessage{
"default": {t1},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t2},
},
},
{
dead: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantDead: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
},
{
dead: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
"critical": {
{Message: t3, Score: s1},
},
},
qname: "critical",
score: s1,
id: t3.ID,
want: nil,
wantDead: map[string][]*base.TaskMessage{
"default": {t1, t2},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {t3},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadQueues(t, r.client, tc.dead)
got := r.RunDeadTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.DeadKey(qname), diff)
}
}
}
}
func TestRunRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessageWithQueue("send_notification", nil, "low")
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
tests := []struct {
retry map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunRetryTask
wantRetry map[string][]*base.TaskMessage
wantPending map[string][]*base.TaskMessage
}{
{
retry: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {t1},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t2},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantRetry: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
"low": {
{Message: t3, Score: s2},
},
},
qname: "low",
score: s2,
id: t3.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {t1, t2},
"low": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
"low": {t3},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
got := r.RunRetryTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}
}
}
func TestRunScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessageWithQueue("send_notification", nil, "notifications")
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
tests := []struct {
scheduled map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunScheduledTask
wantScheduled map[string][]*base.TaskMessage
wantPending map[string][]*base.TaskMessage
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t2},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
},
qname: "default",
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: s1},
{Message: t2, Score: s2},
},
"notifications": {
{Message: t3, Score: s1},
},
},
qname: "notifications",
score: s1,
id: t3.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1, t2},
"notifications": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {},
"notifications": {t3},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.RunScheduledTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
}
}
func TestRunAllScheduledTasks(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil)
t4 := h.NewTaskMessageWithQueue("important_notification", nil, "custom")
t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom")
tests := []struct {
desc string
scheduled map[string][]base.Z
qname string
want int64
wantPending map[string][]*base.TaskMessage
wantScheduled map[string][]*base.TaskMessage
}{
{
desc: "with tasks in scheduled queue",
scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(time.Hour).Unix()},
{Message: t2, Score: time.Now().Add(time.Hour).Unix()},
{Message: t3, Score: time.Now().Add(time.Hour).Unix()},
},
},
qname: "default",
want: 3,
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with empty scheduled queue",
scheduled: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with custom queues",
scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(time.Hour).Unix()},
{Message: t2, Score: time.Now().Add(time.Hour).Unix()},
{Message: t3, Score: time.Now().Add(time.Hour).Unix()},
},
"custom": {
{Message: t4, Score: time.Now().Add(time.Hour).Unix()},
{Message: t5, Score: time.Now().Add(time.Hour).Unix()},
},
},
qname: "custom",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {t4, t5},
},
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got, err := r.RunAllScheduledTasks(tc.qname)
if err != nil {
t.Errorf("%s; r.RunAllScheduledTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.RunAllScheduledTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
}
}
func TestRunAllRetryTasks(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil)
t4 := h.NewTaskMessageWithQueue("important_notification", nil, "custom")
t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom")
tests := []struct {
desc string
retry map[string][]base.Z
qname string
want int64
wantPending map[string][]*base.TaskMessage
wantRetry map[string][]*base.TaskMessage
}{
{
desc: "with tasks in retry queue",
retry: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(time.Hour).Unix()},
{Message: t2, Score: time.Now().Add(time.Hour).Unix()},
{Message: t3, Score: time.Now().Add(time.Hour).Unix()},
},
},
qname: "default",
want: 3,
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with empty retry queue",
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with custom queues",
retry: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(time.Hour).Unix()},
{Message: t2, Score: time.Now().Add(time.Hour).Unix()},
{Message: t3, Score: time.Now().Add(time.Hour).Unix()},
},
"custom": {
{Message: t4, Score: time.Now().Add(time.Hour).Unix()},
{Message: t5, Score: time.Now().Add(time.Hour).Unix()},
},
},
qname: "custom",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {t4, t5},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry)
got, err := r.RunAllRetryTasks(tc.qname)
if err != nil {
t.Errorf("%s; r.RunAllRetryTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.RunAllRetryTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff)
}
}
}
}
func TestRunAllDeadTasks(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("gen_thumbnail", nil)
t3 := h.NewTaskMessage("reindex", nil)
t4 := h.NewTaskMessageWithQueue("important_notification", nil, "custom")
t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom")
tests := []struct {
desc string
dead map[string][]base.Z
qname string
want int64
wantPending map[string][]*base.TaskMessage
wantDead map[string][]*base.TaskMessage
}{
{
desc: "with tasks in dead queue",
dead: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(-time.Minute).Unix()},
{Message: t2, Score: time.Now().Add(-time.Minute).Unix()},
{Message: t3, Score: time.Now().Add(-time.Minute).Unix()},
},
},
qname: "default",
want: 3,
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
wantDead: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with empty dead queue",
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantDead: map[string][]*base.TaskMessage{
"default": {},
},
},
{
desc: "with custom queues",
dead: map[string][]base.Z{
"default": {
{Message: t1, Score: time.Now().Add(-time.Minute).Unix()},
{Message: t2, Score: time.Now().Add(-time.Minute).Unix()},
{Message: t3, Score: time.Now().Add(-time.Minute).Unix()},
},
"custom": {
{Message: t4, Score: time.Now().Add(-time.Minute).Unix()},
{Message: t5, Score: time.Now().Add(-time.Minute).Unix()},
},
},
qname: "custom",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {t4, t5},
},
wantDead: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.RunAllDeadTasks(tc.qname)
if err != nil {
t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil",
tc.desc, tc.qname, got, err, tc.want)
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff)
}
}
}
}
func TestKillRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute)
t2 := time.Now().Add(1 * time.Hour)
t3 := time.Now().Add(2 * time.Hour)
t4 := time.Now().Add(3 * time.Hour)
tests := []struct {
retry map[string][]base.Z
dead map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantRetry map[string][]base.Z
wantDead map[string][]base.Z
}{
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
wantDead: map[string][]base.Z{
"default": {{Message: m1, Score: time.Now().Unix()}},
},
},
{
retry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
dead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantRetry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
wantDead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
{Message: m4, Score: t4.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m4, Score: t4.Unix()},
},
},
wantDead: map[string][]base.Z{
"default": {},
"custom": {{Message: m3, Score: time.Now().Unix()}},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
got := r.KillRetryTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff)
}
}
}
}
func TestKillScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute)
t2 := time.Now().Add(1 * time.Hour)
t3 := time.Now().Add(2 * time.Hour)
t4 := time.Now().Add(3 * time.Hour)
tests := []struct {
scheduled map[string][]base.Z
dead map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantScheduled map[string][]base.Z
wantDead map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
wantDead: map[string][]base.Z{
"default": {{Message: m1, Score: time.Now().Unix()}},
},
},
{
scheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
dead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantScheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
wantDead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
{Message: m4, Score: t4.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m4, Score: t4.Unix()},
},
},
wantDead: map[string][]base.Z{
"default": {},
"custom": {{Message: m3, Score: time.Now().Unix()}},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllDeadQueues(t, r.client, tc.dead)
got := r.KillScheduledTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff)
}
}
}
}
func TestKillAllRetryTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute)
t2 := time.Now().Add(1 * time.Hour)
t3 := time.Now().Add(2 * time.Hour)
t4 := time.Now().Add(3 * time.Hour)
tests := []struct {
retry map[string][]base.Z
dead map[string][]base.Z
qname string
want int64
wantRetry map[string][]base.Z
wantDead map[string][]base.Z
}{
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 2,
wantRetry: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
},
},
{
retry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
dead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
want: 1,
wantRetry: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
retry: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
want: 0,
wantRetry: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
{Message: m4, Score: t4.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
want: 2,
wantRetry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {},
},
wantDead: map[string][]base.Z{
"default": {},
"custom": {
{Message: m3, Score: time.Now().Unix()},
{Message: m4, Score: time.Now().Unix()},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.KillAllRetryTasks(tc.qname)
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil",
tc.qname, got, err, tc.want)
continue
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff)
}
}
}
}
func TestKillAllScheduledTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(time.Minute)
t2 := time.Now().Add(time.Hour)
t3 := time.Now().Add(time.Hour)
t4 := time.Now().Add(time.Hour)
tests := []struct {
scheduled map[string][]base.Z
dead map[string][]base.Z
qname string
want int64
wantScheduled map[string][]base.Z
wantDead map[string][]base.Z
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 2,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
},
},
{
scheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
dead: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
want: 1,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
scheduled: map[string][]base.Z{
"default": {},
},
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
want: 0,
wantScheduled: map[string][]base.Z{
"default": {},
},
wantDead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
{Message: m4, Score: t4.Unix()},
},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
want: 2,
wantScheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {},
},
wantDead: map[string][]base.Z{
"default": {},
"custom": {
{Message: m3, Score: time.Now().Unix()},
{Message: m4, Score: time.Now().Unix()},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.KillAllScheduledTasks(tc.qname)
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllScheduledTasks(%q) = %v, %v; want %v, nil",
tc.qname, got, err, tc.want)
continue
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff)
}
}
}
}
func TestDeleteDeadTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
t1 := time.Now().Add(-5 * time.Minute)
t2 := time.Now().Add(-time.Hour)
t3 := time.Now().Add(-time.Hour)
tests := []struct {
dead map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantDead map[string][]*base.TaskMessage
}{
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantDead: map[string][]*base.TaskMessage{
"default": {m2},
},
},
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
},
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantDead: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
id: m1.ID,
score: t2.Unix(), // id and score mismatch
want: ErrTaskNotFound,
wantDead: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
},
{
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: ErrTaskNotFound,
wantDead: map[string][]*base.TaskMessage{
"default": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadQueues(t, r.client, tc.dead)
got := r.DeleteDeadTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff)
}
}
}
}
func TestDeleteRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
t1 := time.Now().Add(5 * time.Minute)
t2 := time.Now().Add(time.Hour)
t3 := time.Now().Add(time.Hour)
tests := []struct {
retry map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantRetry map[string][]*base.TaskMessage
}{
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {m2},
},
},
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
},
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
{
retry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantRetry: map[string][]*base.TaskMessage{
"default": {m1},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry)
got := r.DeleteRetryTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.DeleteRetryTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}
}
}
func TestDeleteScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
t1 := time.Now().Add(5 * time.Minute)
t2 := time.Now().Add(time.Hour)
t3 := time.Now().Add(time.Hour)
tests := []struct {
scheduled map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantScheduled map[string][]*base.TaskMessage
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m2},
},
},
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
"custom": {
{Message: m3, Score: t3.Unix()},
},
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
{
scheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m1},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.DeleteScheduledTask(tc.qname, tc.id, tc.score)
if got != tc.want {
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
continue
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
}
}
func TestDeleteAllDeadTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
dead map[string][]base.Z
qname string
want int64
wantDead map[string][]*base.TaskMessage
}{
{
dead: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
"custom": {
{Message: m3, Score: time.Now().Unix()},
},
},
qname: "default",
want: 2,
wantDead: map[string][]*base.TaskMessage{
"default": {},
"custom": {m3},
},
},
{
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantDead: map[string][]*base.TaskMessage{
"default": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.DeleteAllDeadTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantDead {
gotDead := h.GetDeadMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff)
}
}
}
}
func TestDeleteAllRetryTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
retry map[string][]base.Z
qname string
want int64
wantRetry map[string][]*base.TaskMessage
}{
{
retry: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
"custom": {
{Message: m3, Score: time.Now().Unix()},
},
},
qname: "custom",
want: 1,
wantRetry: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
{
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 0,
wantRetry: map[string][]*base.TaskMessage{
"default": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry)
got, err := r.DeleteAllRetryTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllRetryTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllRetryTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}
}
}
func TestDeleteAllScheduledTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
scheduled map[string][]base.Z
qname string
want int64
wantScheduled map[string][]*base.TaskMessage
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Add(time.Minute).Unix()},
{Message: m2, Score: time.Now().Add(time.Minute).Unix()},
},
"custom": {
{Message: m3, Score: time.Now().Add(time.Minute).Unix()},
},
},
qname: "default",
want: 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
"custom": {m3},
},
},
{
scheduled: map[string][]base.Z{
"custom": {},
},
qname: "custom",
want: 0,
wantScheduled: map[string][]*base.TaskMessage{
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got, err := r.DeleteAllScheduledTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllScheduledTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllScheduledTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
}
}
func TestRemoveQueue(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
qname string // queue to remove
force bool
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
inProgress: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
force: false,
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
inProgress: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {{Message: m4, Score: time.Now().Unix()}},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
force: true, // allow removing non-empty queue
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
err := r.RemoveQueue(tc.qname, tc.force)
if err != nil {
t.Errorf("(*RDB).RemoveQueue(%q, %t) = %v, want nil",
tc.qname, tc.force, err)
continue
}
if r.client.SIsMember(base.AllQueues, tc.qname).Val() {
t.Errorf("%q is a member of %q", tc.qname, base.AllQueues)
}
keys := []string{
base.QueueKey(tc.qname),
base.ActiveKey(tc.qname),
base.DeadlinesKey(tc.qname),
base.ScheduledKey(tc.qname),
base.RetryKey(tc.qname),
base.DeadKey(tc.qname),
}
for _, key := range keys {
if r.client.Exists(key).Val() != 0 {
t.Errorf("key %q still exists", key)
}
}
}
}
func TestRemoveQueueError(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
tests := []struct {
desc string
pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
qname string // queue to remove
force bool
}{
{
desc: "removing non-existent queue",
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
inProgress: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "nonexistent",
force: false,
},
{
desc: "removing non-empty queue",
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
inProgress: map[string][]*base.TaskMessage{
"default": {},
"custom": {},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {{Message: m4, Score: time.Now().Unix()}},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
force: false,
},
{
desc: "force removing queue with active tasks",
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
inProgress: map[string][]*base.TaskMessage{
"default": {},
"custom": {m4},
},
scheduled: map[string][]base.Z{
"default": {},
"custom": {},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
dead: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
// Even with force=true, it should error if there are active tasks.
force: true,
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
got := r.RemoveQueue(tc.qname, tc.force)
if got == nil {
t.Errorf("%s;(*RDB).RemoveQueue(%q) = nil, want error", tc.desc, tc.qname)
continue
}
// Make sure that nothing changed
for qname, want := range tc.pending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
for qname, want := range tc.inProgress {
gotActive := h.GetActiveMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ActiveKey(qname), diff)
}
}
for qname, want := range tc.scheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.retry {
gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.RetryKey(qname), diff)
}
}
for qname, want := range tc.dead {
gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.DeadKey(qname), diff)
}
}
}
}
func TestListServers(t *testing.T) {
r := setup(t)
defer r.Close()
started1 := time.Now().Add(-time.Hour)
info1 := &base.ServerInfo{
Host: "do.droplet1",
PID: 1234,
ServerID: "server123",
Concurrency: 10,
Queues: map[string]int{"default": 1},
Status: "running",
Started: started1,
ActiveWorkerCount: 0,
}
started2 := time.Now().Add(-2 * time.Hour)
info2 := &base.ServerInfo{
Host: "do.droplet2",
PID: 9876,
ServerID: "server456",
Concurrency: 20,
Queues: map[string]int{"email": 1},
Status: "stopped",
Started: started2,
ActiveWorkerCount: 1,
}
tests := []struct {
data []*base.ServerInfo
}{
{
data: []*base.ServerInfo{},
},
{
data: []*base.ServerInfo{info1},
},
{
data: []*base.ServerInfo{info1, info2},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, info := range tc.data {
if err := r.WriteServerState(info, []*base.WorkerInfo{}, 5*time.Second); err != nil {
t.Fatal(err)
}
}
got, err := r.ListServers()
if err != nil {
t.Errorf("r.ListServers returned an error: %v", err)
}
if diff := cmp.Diff(tc.data, got, h.SortServerInfoOpt); diff != "" {
t.Errorf("r.ListServers returned %v, want %v; (-want,+got)\n%s",
got, tc.data, diff)
}
}
}
func TestListWorkers(t *testing.T) {
r := setup(t)
defer r.Close()
var (
host = "127.0.0.1"
pid = 4567
serverID = "server123"
m1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"})
m2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"})
m3 = h.NewTaskMessage("reindex", map[string]interface{}{})
)
tests := []struct {
data []*base.WorkerInfo
}{
{
data: []*base.WorkerInfo{
{
Host: host,
PID: pid,
ServerID: serverID,
ID: m1.ID.String(),
Type: m1.Type,
Queue: m1.Queue,
Payload: m1.Payload,
Started: time.Now().Add(-1 * time.Second),
},
{
Host: host,
PID: pid,
ServerID: serverID,
ID: m2.ID.String(),
Type: m2.Type,
Queue: m2.Queue,
Payload: m2.Payload,
Started: time.Now().Add(-5 * time.Second),
},
{
Host: host,
PID: pid,
ServerID: serverID,
ID: m3.ID.String(),
Type: m3.Type,
Queue: m3.Queue,
Payload: m3.Payload,
Started: time.Now().Add(-30 * time.Second),
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
err := r.WriteServerState(&base.ServerInfo{}, tc.data, time.Minute)
if err != nil {
t.Errorf("could not write server state to redis: %v", err)
continue
}
got, err := r.ListWorkers()
if err != nil {
t.Errorf("(*RDB).ListWorkers() returned an error: %v", err)
continue
}
if diff := cmp.Diff(tc.data, got, h.SortWorkerInfoOpt); diff != "" {
t.Errorf("(*RDB).ListWorkers() = %v, want = %v; (-want,+got)\n%s", got, tc.data, diff)
}
}
}
func TestWriteListClearSchedulerEntries(t *testing.T) {
r := setup(t)
now := time.Now().UTC()
schedulerID := "127.0.0.1:9876:abc123"
data := []*base.SchedulerEntry{
&base.SchedulerEntry{
Spec: "* * * * *",
Type: "foo",
Payload: nil,
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
&base.SchedulerEntry{
Spec: "@every 20m",
Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"},
Opts: nil,
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},
}
if err := r.WriteSchedulerEntries(schedulerID, data, 30*time.Second); err != nil {
t.Fatalf("WriteSchedulerEnties failed: %v", err)
}
entries, err := r.ListSchedulerEntries()
if err != nil {
t.Fatalf("ListSchedulerEntries failed: %v", err)
}
if diff := cmp.Diff(data, entries, h.SortSchedulerEntryOpt); diff != "" {
t.Errorf("ListSchedulerEntries() = %v, want %v; (-want,+got)\n%s", entries, data, diff)
}
if err := r.ClearSchedulerEntries(schedulerID); err != nil {
t.Fatalf("ClearSchedulerEntries failed: %v", err)
}
entries, err = r.ListSchedulerEntries()
if err != nil {
t.Fatalf("ListSchedulerEntries() after clear failed: %v", err)
}
if len(entries) != 0 {
t.Errorf("found %d entries, want 0 after clearing", len(entries))
}
}
func TestSchedulerEnqueueEvents(t *testing.T) {
r := setup(t)
var (
now = time.Now()
oneDayAgo = now.Add(-24 * time.Hour)
fiveHoursAgo = now.Add(-5 * time.Hour)
oneHourAgo = now.Add(-1 * time.Hour)
)
type event struct {
entryID string
taskID string
enqueuedAt time.Time
}
tests := []struct {
entryID string
events []*base.SchedulerEnqueueEvent
want []*base.SchedulerEnqueueEvent
}{
{
entryID: "entry123",
events: []*base.SchedulerEnqueueEvent{
{TaskID: "task123", EnqueuedAt: oneDayAgo},
{TaskID: "task789", EnqueuedAt: oneHourAgo},
{TaskID: "task456", EnqueuedAt: fiveHoursAgo},
},
// Recent events first
want: []*base.SchedulerEnqueueEvent{
{TaskID: "task789", EnqueuedAt: oneHourAgo},
{TaskID: "task456", EnqueuedAt: fiveHoursAgo},
{TaskID: "task123", EnqueuedAt: oneDayAgo},
},
},
{
entryID: "entry456",
events: nil,
want: nil,
},
}
loop:
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, e := range tc.events {
if err := r.RecordSchedulerEnqueueEvent(tc.entryID, e); err != nil {
t.Errorf("RecordSchedulerEnqueueEvent(%q, %v) failed: %v", tc.entryID, e, err)
continue loop
}
}
got, err := r.ListSchedulerEnqueueEvents(tc.entryID, Pagination{Size: 20, Page: 0})
if err != nil {
t.Errorf("ListSchedulerEnqueueEvents(%q) failed: %v", tc.entryID, err)
continue
}
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
t.Errorf("ListSchedulerEnqueueEvent(%q) = %v, want %v; (-want,+got)\n%s",
tc.entryID, got, tc.want, diff)
}
}
}
func TestPause(t *testing.T) {
r := setup(t)
tests := []struct {
qname string // name of the queue to pause
}{
{qname: "default"},
{qname: "custom"},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
err := r.Pause(tc.qname)
if err != nil {
t.Errorf("Pause(%q) returned error: %v", tc.qname, err)
}
key := base.PausedKey(tc.qname)
if r.client.Exists(key).Val() == 0 {
t.Errorf("key %q does not exist", key)
}
}
}
func TestPauseError(t *testing.T) {
r := setup(t)
tests := []struct {
desc string // test case description
paused []string // already paused queues
qname string // name of the queue to pause
}{
{"queue already paused", []string{"default", "custom"}, "default"},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.paused {
if err := r.Pause(qname); err != nil {
t.Fatalf("could not pause %q: %v", qname, err)
}
}
err := r.Pause(tc.qname)
if err == nil {
t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname)
}
}
}
func TestUnpause(t *testing.T) {
r := setup(t)
tests := []struct {
paused []string // already paused queues
qname string // name of the queue to unpause
}{
{[]string{"default", "custom"}, "default"},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.paused {
if err := r.Pause(qname); err != nil {
t.Fatalf("could not pause %q: %v", qname, err)
}
}
err := r.Unpause(tc.qname)
if err != nil {
t.Errorf("Unpause(%q) returned error: %v", tc.qname, err)
}
key := base.PausedKey(tc.qname)
if r.client.Exists(key).Val() == 1 {
t.Errorf("key %q exists", key)
}
}
}
func TestUnpauseError(t *testing.T) {
r := setup(t)
tests := []struct {
desc string // test case description
paused []string // already paused queues
qname string // name of the queue to unpause
}{
{"queue is not paused", []string{"default"}, "custom"},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, qname := range tc.paused {
if err := r.Pause(qname); err != nil {
t.Fatalf("could not pause %q: %v", qname, err)
}
}
err := r.Unpause(tc.qname)
if err == nil {
t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname)
}
}
}