From 6b96459881ece82434761389d7e4388022c81d05 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 29 Aug 2020 06:54:08 -0700 Subject: [PATCH] Add test flags to run tests using redis cluster --- asynq_test.go | 43 ++++++++-- benchmark_test.go | 20 +---- client_test.go | 38 +++------ inspector_test.go | 136 ++++++++------------------------ internal/asynqtest/asynqtest.go | 19 ++++- internal/rdb/inspect_test.go | 32 +++++--- internal/rdb/rdb_test.go | 45 ++++++++--- server_test.go | 14 ++-- 8 files changed, 161 insertions(+), 186 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 2cbf774..2ca46d4 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -7,6 +7,7 @@ package asynq import ( "flag" "sort" + "strings" "testing" "github.com/go-redis/redis/v7" @@ -24,6 +25,9 @@ var ( redisAddr string redisDB int + useRedisCluster bool + redisClusterAddrs string // comma-separated list of host:port + testLogLevel = FatalLevel ) @@ -32,23 +36,52 @@ var testLogger *log.Logger func init() { flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing") + flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing") + flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses") flag.Var(&testLogLevel, "loglevel", "log level to use in testing") testLogger = log.NewLogger(nil) testLogger.SetLevel(toInternalLogLevel(testLogLevel)) } -func setup(tb testing.TB) *redis.Client { +func setup(tb testing.TB) (r redis.UniversalClient) { tb.Helper() - r := redis.NewClient(&redis.Options{ - Addr: redisAddr, - DB: redisDB, - }) + if useRedisCluster { + addrs := strings.Split(redisClusterAddrs, ",") + if len(addrs) == 0 { + tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") + } + r = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + }) + } else { + r = redis.NewClient(&redis.Options{ + Addr: redisAddr, + DB: redisDB, + }) + } // Start each test with a clean slate. h.FlushDB(tb, r) return r } +func getRedisConnOpt(tb testing.TB) RedisConnOpt { + tb.Helper() + if useRedisCluster { + addrs := strings.Split(redisClusterAddrs, ",") + if len(addrs) == 0 { + tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") + } + return RedisClusterClientOpt{ + Addrs: addrs, + } + } + return RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + } +} + var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { out := append([]*Task(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { diff --git a/benchmark_test.go b/benchmark_test.go index df2d190..0342b80 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -18,10 +18,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() // begin setup setup(b) - redis := &RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - } + redis := getRedisConnOpt(b) client := NewClient(redis) srv := NewServer(redis, Config{ Concurrency: 10, @@ -61,10 +58,7 @@ func BenchmarkEndToEnd(b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() // begin setup setup(b) - redis := &RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - } + redis := getRedisConnOpt(b) client := NewClient(redis) srv := NewServer(redis, Config{ Concurrency: 10, @@ -127,10 +121,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() // begin setup setup(b) - redis := &RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - } + redis := getRedisConnOpt(b) client := NewClient(redis) srv := NewServer(redis, Config{ Concurrency: 10, @@ -185,10 +176,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { for n := 0; n < b.N; n++ { b.StopTimer() // begin setup setup(b) - redis := &RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - } + redis := getRedisConnOpt(b) client := NewClient(redis) srv := NewServer(redis, Config{ Concurrency: 10, diff --git a/client_test.go b/client_test.go index d0e169c..ac37f6e 100644 --- a/client_test.go +++ b/client_test.go @@ -17,10 +17,7 @@ import ( func TestClientEnqueueAt(t *testing.T) { r := setup(t) - client := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + client := NewClient(getRedisConnOpt(t)) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) @@ -127,10 +124,7 @@ func TestClientEnqueueAt(t *testing.T) { func TestClientEnqueue(t *testing.T) { r := setup(t) - client := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + client := NewClient(getRedisConnOpt(t)) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) @@ -369,10 +363,7 @@ func TestClientEnqueue(t *testing.T) { func TestClientEnqueueIn(t *testing.T) { r := setup(t) - client := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + client := NewClient(getRedisConnOpt(t)) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) @@ -551,7 +542,7 @@ func TestClientDefaultOptions(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) + c := NewClient(getRedisConnOpt(t)) c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) gotRes, err := c.Enqueue(tc.task, tc.opts...) if err != nil { @@ -575,12 +566,9 @@ func TestClientDefaultOptions(t *testing.T) { } } -func TestEnqueueUnique(t *testing.T) { +func TestClientEnqueueUnique(t *testing.T) { r := setup(t) - c := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + c := NewClient(getRedisConnOpt(t)) tests := []struct { task *Task @@ -620,12 +608,9 @@ func TestEnqueueUnique(t *testing.T) { } } -func TestEnqueueInUnique(t *testing.T) { +func TestClientEnqueueInUnique(t *testing.T) { r := setup(t) - c := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + c := NewClient(getRedisConnOpt(t)) tests := []struct { task *Task @@ -668,12 +653,9 @@ func TestEnqueueInUnique(t *testing.T) { } } -func TestEnqueueAtUnique(t *testing.T) { +func TestClientEnqueueAtUnique(t *testing.T) { r := setup(t) - c := NewClient(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + c := NewClient(getRedisConnOpt(t)) tests := []struct { task *Task diff --git a/inspector_test.go b/inspector_test.go index 401b4b2..1a240a3 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -19,10 +19,7 @@ import ( func TestInspectorQueues(t *testing.T) { r := setup(t) - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { queues []string @@ -63,10 +60,7 @@ func TestInspectorCurrentStats(t *testing.T) { now := time.Now() timeCmpOpt := cmpopts.EquateApproxTime(time.Second) - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { enqueued map[string][]*base.TaskMessage @@ -169,12 +163,7 @@ func TestInspectorCurrentStats(t *testing.T) { func TestInspectorHistory(t *testing.T) { r := setup(t) now := time.Now().UTC() - timeCmpOpt := cmpopts.EquateApproxTime(time.Second) - - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { qname string // queue of interest @@ -215,6 +204,9 @@ func TestInspectorHistory(t *testing.T) { Failed: (i + 1) * 10, Date: now.Add(-time.Duration(i) * 24 * time.Hour), } + // Allow 10 seconds difference in timestamp. + // When testing with Redis Cluster it could take a while to set up, and timestamp can have a few second difference. + timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Second) if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { t.Errorf("Inspector.History %d days ago data; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) @@ -238,10 +230,7 @@ func TestInspectorListEnqueuedTasks(t *testing.T) { m3 := asynqtest.NewTaskMessage("task3", nil) m4 := asynqtest.NewTaskMessage("task4", nil) - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -309,10 +298,7 @@ func TestInspectorListInProgressTasks(t *testing.T) { m3 := asynqtest.NewTaskMessage("task3", nil) m4 := asynqtest.NewTaskMessage("task4", nil) - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) createInProgressTask := func(msg *base.TaskMessage) *InProgressTask { return &InProgressTask{ @@ -382,10 +368,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -460,10 +443,7 @@ func TestInspectorListRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -539,10 +519,7 @@ func TestInspectorListDeadTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -601,10 +578,7 @@ func TestInspectorListPagination(t *testing.T) { r := setup(t) asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName) - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { page int @@ -666,10 +640,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -734,10 +705,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -802,10 +770,7 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { dead map[string][]base.Z @@ -870,10 +835,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1005,10 +967,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1069,7 +1028,8 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { dead: map[string][]base.Z{ "default": {z1, z2}, }, - want: 0, + qname: "default", + want: 0, wantRetry: map[string][]base.Z{ "default": {}, }, @@ -1098,9 +1058,10 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } + cmpOpt := asynqtest.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score for qname, want := range tc.wantDead { gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1119,10 +1080,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1238,10 +1196,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1357,10 +1312,7 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { dead map[string][]base.Z @@ -1471,10 +1423,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1523,10 +1472,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1575,10 +1521,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { dead map[string][]base.Z @@ -1627,10 +1570,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1699,10 +1639,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1770,10 +1707,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { dead map[string][]base.Z @@ -1845,10 +1779,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1918,10 +1849,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - }) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 2c34f5b..6e831ac 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -22,7 +22,7 @@ import ( // to be equal if they are within the given margin. func EquateInt64Approx(margin int64) cmp.Option { return cmp.Comparer(func(a, b int64) bool { - return math.Abs(float64(a-b)) < float64(margin) + return math.Abs(float64(a-b)) <= float64(margin) }) } @@ -156,8 +156,21 @@ func MustUnmarshalSlice(tb testing.TB, data []string) []*base.TaskMessage { // FlushDB deletes all the keys of the currently selected DB. func FlushDB(tb testing.TB, r redis.UniversalClient) { tb.Helper() - if err := r.FlushDB().Err(); err != nil { - tb.Fatal(err) + switch r := r.(type) { + case *redis.Client: + if err := r.FlushDB().Err(); err != nil { + tb.Fatal(err) + } + case *redis.ClusterClient: + err := r.ForEachMaster(func(c *redis.Client) error { + if err := c.FlushAll().Err(); err != nil { + return err + } + return nil + }) + if err != nil { + tb.Fatal(err) + } } } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index cedb287..3855c62 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -268,7 +268,10 @@ func TestHistoricalStats(t *testing.T) { Failed: (i + 1) * 10, Time: now.Add(-time.Duration(i) * 24 * time.Hour), } - if diff := cmp.Diff(want, got[i], timeCmpOpt); diff != "" { + // Allow 10 seconds difference in timestamp. + // When testing with Redis Cluster it could take a while to set up, and timestamp can have a few second difference. + cmpOpt := cmpopts.EquateApproxTime(10 * time.Second) + if diff := cmp.Diff(want, got[i], cmpOpt); diff != "" { t.Errorf("RDB.HistoricalStats for the last %d days; got %+v, want %+v; (-want,+got):\n%s", i, got[i], want, diff) } } @@ -604,7 +607,7 @@ func TestListScheduled(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } @@ -756,7 +759,7 @@ func TestListRetry(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -907,7 +910,7 @@ func TestListDead(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -973,7 +976,10 @@ func TestListDeadPagination(t *testing.T) { } } -var timeCmpOpt = cmpopts.EquateApproxTime(time.Second) +var ( + timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time + zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score +) func TestEnqueueDeadTask(t *testing.T) { r := setup(t) @@ -1711,7 +1717,7 @@ func TestKillRetryTask(t *testing.T) { for qname, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryKey(qname), diff) } @@ -1719,7 +1725,7 @@ func TestKillRetryTask(t *testing.T) { for qname, want := range tc.wantDead { gotDead := h.GetDeadEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadKey(qname), diff) } @@ -1836,7 +1842,7 @@ func TestKillScheduledTask(t *testing.T) { for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledKey(qname), diff) } @@ -1844,7 +1850,7 @@ func TestKillScheduledTask(t *testing.T) { for qname, want := range tc.wantDead { gotDead := h.GetDeadEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadKey(qname), diff) } @@ -1982,7 +1988,7 @@ func TestKillAllRetryTasks(t *testing.T) { for qname, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryKey(qname), diff) } @@ -1990,7 +1996,7 @@ func TestKillAllRetryTasks(t *testing.T) { for qname, want := range tc.wantDead { gotDead := h.GetDeadEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadKey(qname), diff) } @@ -2128,7 +2134,7 @@ func TestKillAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledKey(qname), diff) } @@ -2136,7 +2142,7 @@ func TestKillAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantDead { gotDead := h.GetDeadEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.DeadKey(qname), diff) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index ab18fce..4feeb22 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -6,7 +6,9 @@ package rdb import ( "encoding/json" + "flag" "fmt" + "strings" "sync" "testing" "time" @@ -19,13 +21,38 @@ import ( "github.com/hibiken/asynq/internal/base" ) -// TODO(hibiken): Get Redis address and db number from ENV variables. -func setup(t *testing.T) *RDB { +// variables used for package testing. +var ( + redisAddr string + redisDB int + + useRedisCluster bool + redisClusterAddrs string // comma-separated list of host:port +) + +func init() { + flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") + flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing") + flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing") + flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses") +} + +func setup(t *testing.T) (r *RDB) { t.Helper() - r := NewRDB(redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - DB: 13, - })) + if useRedisCluster { + addrs := strings.Split(redisClusterAddrs, ",") + if len(addrs) == 0 { + t.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") + } + r = NewRDB(redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + })) + } else { + r = NewRDB(redis.NewClient(&redis.Options{ + Addr: redisAddr, + DB: redisDB, + })) + } // Start each test with a clean slate. h.FlushDB(t, r.client) return r @@ -280,7 +307,7 @@ func TestDequeue(t *testing.T) { tc.args, gotMsg, tc.wantMsg) continue } - if gotDeadline != tc.wantDeadline { + if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, tc.wantDeadline) continue @@ -444,7 +471,7 @@ func TestDone(t *testing.T) { Payload: nil, Timeout: 1800, Deadline: 0, - UniqueKey: "reindex:nil:default", + UniqueKey: "asynq:{default}:unique:reindex:nil", Queue: "default", } t1Deadline := now.Unix() + t1.Timeout @@ -1147,7 +1174,7 @@ func TestKill(t *testing.T) { } for queue, want := range tc.wantDead { gotDead := h.GetDeadEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff) } } diff --git a/server_test.go b/server_test.go index 4d08f45..11dda1d 100644 --- a/server_test.go +++ b/server_test.go @@ -21,12 +21,9 @@ func TestServer(t *testing.T) { ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper") defer goleak.VerifyNoLeaks(t, ignoreOpt) - r := &RedisClientOpt{ - Addr: "localhost:6379", - DB: 15, - } - c := NewClient(r) - srv := NewServer(r, Config{ + redisConnOpt := getRedisConnOpt(t) + c := NewClient(redisConnOpt) + srv := NewServer(redisConnOpt, Config{ Concurrency: 10, LogLevel: testLogLevel, }) @@ -159,14 +156,15 @@ func TestServerWithFlakyBroker(t *testing.T) { }() r := rdb.NewRDB(setup(t)) testBroker := testbroker.NewTestBroker(r) - srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{LogLevel: testLogLevel}) + redisConnOpt := getRedisConnOpt(t) + srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel}) srv.broker = testBroker srv.scheduler.broker = testBroker srv.heartbeater.broker = testBroker srv.processor.broker = testBroker srv.subscriber.broker = testBroker - c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) + c := NewClient(redisConnOpt) h := func(ctx context.Context, task *Task) error { // force task retry.