2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Add test flags to run tests using redis cluster

This commit is contained in:
Ken Hibino 2020-08-29 06:54:08 -07:00
parent 572eb338d5
commit 6b96459881
8 changed files with 161 additions and 186 deletions

View File

@ -7,6 +7,7 @@ package asynq
import ( import (
"flag" "flag"
"sort" "sort"
"strings"
"testing" "testing"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
@ -24,6 +25,9 @@ var (
redisAddr string redisAddr string
redisDB int redisDB int
useRedisCluster bool
redisClusterAddrs string // comma-separated list of host:port
testLogLevel = FatalLevel testLogLevel = FatalLevel
) )
@ -32,23 +36,52 @@ var testLogger *log.Logger
func init() { func init() {
flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing")
flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing") flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing")
flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing")
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
flag.Var(&testLogLevel, "loglevel", "log level to use in testing") flag.Var(&testLogLevel, "loglevel", "log level to use in testing")
testLogger = log.NewLogger(nil) testLogger = log.NewLogger(nil)
testLogger.SetLevel(toInternalLogLevel(testLogLevel)) testLogger.SetLevel(toInternalLogLevel(testLogLevel))
} }
func setup(tb testing.TB) *redis.Client { func setup(tb testing.TB) (r redis.UniversalClient) {
tb.Helper() tb.Helper()
r := redis.NewClient(&redis.Options{ if useRedisCluster {
Addr: redisAddr, addrs := strings.Split(redisClusterAddrs, ",")
DB: redisDB, if len(addrs) == 0 {
}) tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
r = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
})
} else {
r = redis.NewClient(&redis.Options{
Addr: redisAddr,
DB: redisDB,
})
}
// Start each test with a clean slate. // Start each test with a clean slate.
h.FlushDB(tb, r) h.FlushDB(tb, r)
return r return r
} }
func getRedisConnOpt(tb testing.TB) RedisConnOpt {
tb.Helper()
if useRedisCluster {
addrs := strings.Split(redisClusterAddrs, ",")
if len(addrs) == 0 {
tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
return RedisClusterClientOpt{
Addrs: addrs,
}
}
return RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
}
}
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
out := append([]*Task(nil), in...) // Copy input to avoid mutating it out := append([]*Task(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool { sort.Slice(out, func(i, j int) bool {

View File

@ -18,10 +18,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup b.StopTimer() // begin setup
setup(b) setup(b)
redis := &RedisClientOpt{ redis := getRedisConnOpt(b)
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis) client := NewClient(redis)
srv := NewServer(redis, Config{ srv := NewServer(redis, Config{
Concurrency: 10, Concurrency: 10,
@ -61,10 +58,7 @@ func BenchmarkEndToEnd(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup b.StopTimer() // begin setup
setup(b) setup(b)
redis := &RedisClientOpt{ redis := getRedisConnOpt(b)
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis) client := NewClient(redis)
srv := NewServer(redis, Config{ srv := NewServer(redis, Config{
Concurrency: 10, Concurrency: 10,
@ -127,10 +121,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup b.StopTimer() // begin setup
setup(b) setup(b)
redis := &RedisClientOpt{ redis := getRedisConnOpt(b)
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis) client := NewClient(redis)
srv := NewServer(redis, Config{ srv := NewServer(redis, Config{
Concurrency: 10, Concurrency: 10,
@ -185,10 +176,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup b.StopTimer() // begin setup
setup(b) setup(b)
redis := &RedisClientOpt{ redis := getRedisConnOpt(b)
Addr: redisAddr,
DB: redisDB,
}
client := NewClient(redis) client := NewClient(redis)
srv := NewServer(redis, Config{ srv := NewServer(redis, Config{
Concurrency: 10, Concurrency: 10,

View File

@ -17,10 +17,7 @@ import (
func TestClientEnqueueAt(t *testing.T) { func TestClientEnqueueAt(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(RedisClientOpt{ client := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
@ -127,10 +124,7 @@ func TestClientEnqueueAt(t *testing.T) {
func TestClientEnqueue(t *testing.T) { func TestClientEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(RedisClientOpt{ client := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
@ -369,10 +363,7 @@ func TestClientEnqueue(t *testing.T) {
func TestClientEnqueueIn(t *testing.T) { func TestClientEnqueueIn(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(RedisClientOpt{ client := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
@ -551,7 +542,7 @@ func TestClientDefaultOptions(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) c := NewClient(getRedisConnOpt(t))
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
gotRes, err := c.Enqueue(tc.task, tc.opts...) gotRes, err := c.Enqueue(tc.task, tc.opts...)
if err != nil { if err != nil {
@ -575,12 +566,9 @@ func TestClientDefaultOptions(t *testing.T) {
} }
} }
func TestEnqueueUnique(t *testing.T) { func TestClientEnqueueUnique(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(RedisClientOpt{ c := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
task *Task task *Task
@ -620,12 +608,9 @@ func TestEnqueueUnique(t *testing.T) {
} }
} }
func TestEnqueueInUnique(t *testing.T) { func TestClientEnqueueInUnique(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(RedisClientOpt{ c := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
task *Task task *Task
@ -668,12 +653,9 @@ func TestEnqueueInUnique(t *testing.T) {
} }
} }
func TestEnqueueAtUnique(t *testing.T) { func TestClientEnqueueAtUnique(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(RedisClientOpt{ c := NewClient(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
task *Task task *Task

View File

@ -19,10 +19,7 @@ import (
func TestInspectorQueues(t *testing.T) { func TestInspectorQueues(t *testing.T) {
r := setup(t) r := setup(t)
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
queues []string queues []string
@ -63,10 +60,7 @@ func TestInspectorCurrentStats(t *testing.T) {
now := time.Now() now := time.Now()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage enqueued map[string][]*base.TaskMessage
@ -169,12 +163,7 @@ func TestInspectorCurrentStats(t *testing.T) {
func TestInspectorHistory(t *testing.T) { func TestInspectorHistory(t *testing.T) {
r := setup(t) r := setup(t)
now := time.Now().UTC() now := time.Now().UTC()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) inspector := NewInspector(getRedisConnOpt(t))
inspector := NewInspector(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
qname string // queue of interest qname string // queue of interest
@ -215,6 +204,9 @@ func TestInspectorHistory(t *testing.T) {
Failed: (i + 1) * 10, Failed: (i + 1) * 10,
Date: now.Add(-time.Duration(i) * 24 * time.Hour), Date: now.Add(-time.Duration(i) * 24 * time.Hour),
} }
// Allow 10 seconds difference in timestamp.
// When testing with Redis Cluster it could take a while to set up, and timestamp can have a few second difference.
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Second)
if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" {
t.Errorf("Inspector.History %d days ago data; got %+v, want %+v; (-want,+got):\n%s", t.Errorf("Inspector.History %d days ago data; got %+v, want %+v; (-want,+got):\n%s",
i, got[i], want, diff) i, got[i], want, diff)
@ -238,10 +230,7 @@ func TestInspectorListEnqueuedTasks(t *testing.T) {
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
m4 := asynqtest.NewTaskMessage("task4", nil) m4 := asynqtest.NewTaskMessage("task4", nil)
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
desc string desc string
@ -309,10 +298,7 @@ func TestInspectorListInProgressTasks(t *testing.T) {
m3 := asynqtest.NewTaskMessage("task3", nil) m3 := asynqtest.NewTaskMessage("task3", nil)
m4 := asynqtest.NewTaskMessage("task4", nil) m4 := asynqtest.NewTaskMessage("task4", nil)
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
createInProgressTask := func(msg *base.TaskMessage) *InProgressTask { createInProgressTask := func(msg *base.TaskMessage) *InProgressTask {
return &InProgressTask{ return &InProgressTask{
@ -382,10 +368,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
desc string desc string
@ -460,10 +443,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
desc string desc string
@ -539,10 +519,7 @@ func TestInspectorListDeadTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
desc string desc string
@ -601,10 +578,7 @@ func TestInspectorListPagination(t *testing.T) {
r := setup(t) r := setup(t)
asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName) asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName)
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
page int page int
@ -666,10 +640,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -734,10 +705,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -802,10 +770,7 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
dead map[string][]base.Z dead map[string][]base.Z
@ -870,10 +835,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1005,10 +967,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -1069,7 +1028,8 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
dead: map[string][]base.Z{ dead: map[string][]base.Z{
"default": {z1, z2}, "default": {z1, z2},
}, },
want: 0, qname: "default",
want: 0,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]base.Z{
"default": {}, "default": {},
}, },
@ -1098,9 +1058,10 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
cmpOpt := asynqtest.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score
for qname, want := range tc.wantDead { for qname, want := range tc.wantDead {
gotDead := asynqtest.GetDeadEntries(t, r, qname) gotDead := asynqtest.GetDeadEntries(t, r, qname)
if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff)
} }
} }
@ -1119,10 +1080,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1238,10 +1196,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -1357,10 +1312,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
dead map[string][]base.Z dead map[string][]base.Z
@ -1471,10 +1423,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1523,10 +1472,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -1575,10 +1521,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
dead map[string][]base.Z dead map[string][]base.Z
@ -1627,10 +1570,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1699,10 +1639,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z
@ -1770,10 +1707,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
dead map[string][]base.Z dead map[string][]base.Z
@ -1845,10 +1779,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
scheduled map[string][]base.Z scheduled map[string][]base.Z
@ -1918,10 +1849,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := NewInspector(RedisClientOpt{ inspector := NewInspector(getRedisConnOpt(t))
Addr: redisAddr,
DB: redisDB,
})
tests := []struct { tests := []struct {
retry map[string][]base.Z retry map[string][]base.Z

View File

@ -22,7 +22,7 @@ import (
// to be equal if they are within the given margin. // to be equal if they are within the given margin.
func EquateInt64Approx(margin int64) cmp.Option { func EquateInt64Approx(margin int64) cmp.Option {
return cmp.Comparer(func(a, b int64) bool { return cmp.Comparer(func(a, b int64) bool {
return math.Abs(float64(a-b)) < float64(margin) return math.Abs(float64(a-b)) <= float64(margin)
}) })
} }
@ -156,8 +156,21 @@ func MustUnmarshalSlice(tb testing.TB, data []string) []*base.TaskMessage {
// FlushDB deletes all the keys of the currently selected DB. // FlushDB deletes all the keys of the currently selected DB.
func FlushDB(tb testing.TB, r redis.UniversalClient) { func FlushDB(tb testing.TB, r redis.UniversalClient) {
tb.Helper() tb.Helper()
if err := r.FlushDB().Err(); err != nil { switch r := r.(type) {
tb.Fatal(err) case *redis.Client:
if err := r.FlushDB().Err(); err != nil {
tb.Fatal(err)
}
case *redis.ClusterClient:
err := r.ForEachMaster(func(c *redis.Client) error {
if err := c.FlushAll().Err(); err != nil {
return err
}
return nil
})
if err != nil {
tb.Fatal(err)
}
} }
} }

View File

@ -268,7 +268,10 @@ func TestHistoricalStats(t *testing.T) {
Failed: (i + 1) * 10, Failed: (i + 1) * 10,
Time: now.Add(-time.Duration(i) * 24 * time.Hour), Time: now.Add(-time.Duration(i) * 24 * time.Hour),
} }
if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { // Allow 10 seconds difference in timestamp.
// When testing with Redis Cluster it could take a while to set up, and timestamp can have a few second difference.
cmpOpt := cmpopts.EquateApproxTime(10 * time.Second)
if diff := cmp.Diff(want, got[i], cmpOpt); diff != "" {
t.Errorf("RDB.HistoricalStats for the last %d days; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) t.Errorf("RDB.HistoricalStats for the last %d days; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff)
} }
} }
@ -604,7 +607,7 @@ func TestListScheduled(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
} }
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue continue
} }
@ -756,7 +759,7 @@ func TestListRetry(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
} }
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff) op, got, err, tc.want, diff)
continue continue
@ -907,7 +910,7 @@ func TestListDead(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
} }
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff) op, got, err, tc.want, diff)
continue continue
@ -973,7 +976,10 @@ func TestListDeadPagination(t *testing.T) {
} }
} }
var timeCmpOpt = cmpopts.EquateApproxTime(time.Second) var (
timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time
zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score
)
func TestEnqueueDeadTask(t *testing.T) { func TestEnqueueDeadTask(t *testing.T) {
r := setup(t) r := setup(t)
@ -1711,7 +1717,7 @@ func TestKillRetryTask(t *testing.T) {
for qname, want := range tc.wantRetry { for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, qname) gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryKey(qname), diff) base.RetryKey(qname), diff)
} }
@ -1719,7 +1725,7 @@ func TestKillRetryTask(t *testing.T) {
for qname, want := range tc.wantDead { for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname) gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff) base.DeadKey(qname), diff)
} }
@ -1836,7 +1842,7 @@ func TestKillScheduledTask(t *testing.T) {
for qname, want := range tc.wantScheduled { for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname) gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledKey(qname), diff) base.ScheduledKey(qname), diff)
} }
@ -1844,7 +1850,7 @@ func TestKillScheduledTask(t *testing.T) {
for qname, want := range tc.wantDead { for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname) gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff) base.DeadKey(qname), diff)
} }
@ -1982,7 +1988,7 @@ func TestKillAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantRetry { for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryEntries(t, r.client, qname) gotRetry := h.GetRetryEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryKey(qname), diff) base.RetryKey(qname), diff)
} }
@ -1990,7 +1996,7 @@ func TestKillAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantDead { for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname) gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff) base.DeadKey(qname), diff)
} }
@ -2128,7 +2134,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantScheduled { for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledEntries(t, r.client, qname) gotScheduled := h.GetScheduledEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledKey(qname), diff) base.ScheduledKey(qname), diff)
} }
@ -2136,7 +2142,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantDead { for qname, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, qname) gotDead := h.GetDeadEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s", t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadKey(qname), diff) base.DeadKey(qname), diff)
} }

View File

@ -6,7 +6,9 @@ package rdb
import ( import (
"encoding/json" "encoding/json"
"flag"
"fmt" "fmt"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -19,13 +21,38 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
// TODO(hibiken): Get Redis address and db number from ENV variables. // variables used for package testing.
func setup(t *testing.T) *RDB { var (
redisAddr string
redisDB int
useRedisCluster bool
redisClusterAddrs string // comma-separated list of host:port
)
func init() {
flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing")
flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing")
flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing")
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
}
func setup(t *testing.T) (r *RDB) {
t.Helper() t.Helper()
r := NewRDB(redis.NewClient(&redis.Options{ if useRedisCluster {
Addr: "localhost:6379", addrs := strings.Split(redisClusterAddrs, ",")
DB: 13, if len(addrs) == 0 {
})) t.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
}))
} else {
r = NewRDB(redis.NewClient(&redis.Options{
Addr: redisAddr,
DB: redisDB,
}))
}
// Start each test with a clean slate. // Start each test with a clean slate.
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
return r return r
@ -280,7 +307,7 @@ func TestDequeue(t *testing.T) {
tc.args, gotMsg, tc.wantMsg) tc.args, gotMsg, tc.wantMsg)
continue continue
} }
if gotDeadline != tc.wantDeadline { if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) {
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v",
tc.args, gotDeadline, tc.wantDeadline) tc.args, gotDeadline, tc.wantDeadline)
continue continue
@ -444,7 +471,7 @@ func TestDone(t *testing.T) {
Payload: nil, Payload: nil,
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
UniqueKey: "reindex:nil:default", UniqueKey: "asynq:{default}:unique:reindex:nil",
Queue: "default", Queue: "default",
} }
t1Deadline := now.Unix() + t1.Timeout t1Deadline := now.Unix() + t1.Timeout
@ -1147,7 +1174,7 @@ func TestKill(t *testing.T) {
} }
for queue, want := range tc.wantDead { for queue, want := range tc.wantDead {
gotDead := h.GetDeadEntries(t, r.client, queue) gotDead := h.GetDeadEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff) t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff)
} }
} }

View File

@ -21,12 +21,9 @@ func TestServer(t *testing.T) {
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper") ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNoLeaks(t, ignoreOpt) defer goleak.VerifyNoLeaks(t, ignoreOpt)
r := &RedisClientOpt{ redisConnOpt := getRedisConnOpt(t)
Addr: "localhost:6379", c := NewClient(redisConnOpt)
DB: 15, srv := NewServer(redisConnOpt, Config{
}
c := NewClient(r)
srv := NewServer(r, Config{
Concurrency: 10, Concurrency: 10,
LogLevel: testLogLevel, LogLevel: testLogLevel,
}) })
@ -159,14 +156,15 @@ func TestServerWithFlakyBroker(t *testing.T) {
}() }()
r := rdb.NewRDB(setup(t)) r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{LogLevel: testLogLevel}) redisConnOpt := getRedisConnOpt(t)
srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel})
srv.broker = testBroker srv.broker = testBroker
srv.scheduler.broker = testBroker srv.scheduler.broker = testBroker
srv.heartbeater.broker = testBroker srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker srv.processor.broker = testBroker
srv.subscriber.broker = testBroker srv.subscriber.broker = testBroker
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) c := NewClient(redisConnOpt)
h := func(ctx context.Context, task *Task) error { h := func(ctx context.Context, task *Task) error {
// force task retry. // force task retry.