From a396a2c43cbdc34f5ca82d5588cb243181519435 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 12 Mar 2021 06:47:56 -0800 Subject: [PATCH] Clean up asynqtest package --- internal/asynqtest/asynqtest.go | 191 +++++++++++--------------------- 1 file changed, 64 insertions(+), 127 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 082ec73..7b1743d 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -147,28 +147,6 @@ func MustUnmarshal(tb testing.TB, data string) *base.TaskMessage { return msg } -// MustMarshalSlice marshals a slice of task messages and return a slice of -// json strings. Calling test will fail if marshaling errors out. -func MustMarshalSlice(tb testing.TB, msgs []*base.TaskMessage) []string { - tb.Helper() - var data []string - for _, m := range msgs { - data = append(data, MustMarshal(tb, m)) - } - return data -} - -// MustUnmarshalSlice unmarshals a slice of strings into a slice of task message structs. -// Calling test will fail if marshaling errors out. -func MustUnmarshalSlice(tb testing.TB, data []string) []*base.TaskMessage { - tb.Helper() - var msgs []*base.TaskMessage - for _, s := range data { - msgs = append(msgs, MustUnmarshal(tb, s)) - } - return msgs -} - // FlushDB deletes all the keys of the currently selected DB. func FlushDB(tb testing.TB, r redis.UniversalClient) { tb.Helper() @@ -236,6 +214,7 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna // // pending maps a queue name to a list of messages. func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[string][]*base.TaskMessage) { + tb.Helper() for q, msgs := range pending { SeedPendingQueue(tb, r, msgs, q) } @@ -243,6 +222,7 @@ func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[st // SeedAllActiveQueues initializes all of the specified active queues with the given messages. func SeedAllActiveQueues(tb testing.TB, r redis.UniversalClient, active map[string][]*base.TaskMessage) { + tb.Helper() for q, msgs := range active { SeedActiveQueue(tb, r, msgs, q) } @@ -250,6 +230,7 @@ func SeedAllActiveQueues(tb testing.TB, r redis.UniversalClient, active map[stri // SeedAllScheduledQueues initializes all of the specified scheduled queues with the given entries. func SeedAllScheduledQueues(tb testing.TB, r redis.UniversalClient, scheduled map[string][]base.Z) { + tb.Helper() for q, entries := range scheduled { SeedScheduledQueue(tb, r, entries, q) } @@ -257,6 +238,7 @@ func SeedAllScheduledQueues(tb testing.TB, r redis.UniversalClient, scheduled ma // SeedAllRetryQueues initializes all of the specified retry queues with the given entries. func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string][]base.Z) { + tb.Helper() for q, entries := range retry { SeedRetryQueue(tb, r, entries, q) } @@ -264,6 +246,7 @@ func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string // SeedAllArchivedQueues initializes all of the specified archived queues with the given entries. func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[string][]base.Z) { + tb.Helper() for q, entries := range archived { SeedArchivedQueue(tb, r, entries, q) } @@ -271,12 +254,14 @@ func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[ // SeedAllDeadlines initializes all of the deadlines with the given entries. func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) { + tb.Helper() for q, entries := range deadlines { SeedDeadlines(tb, r, entries, q) } } func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) { + tb.Helper() for _, msg := range msgs { encoded := MustMarshal(tb, msg) if err := c.LPush(key, msg.ID.String()).Err(); err != nil { @@ -295,6 +280,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b } func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) { + tb.Helper() for _, item := range items { msg := item.Message encoded := MustMarshal(tb, msg) @@ -317,19 +303,61 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b // GetPendingMessages returns all pending messages in the given queue. func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - ids := r.LRange(base.PendingKey(qname), 0, -1).Val() - var msgs []*base.TaskMessage - for _, id := range ids { - data := r.HGet(base.TaskKey(qname, id), "msg").Val() - msgs = append(msgs, MustUnmarshal(tb, data)) - } - return msgs + return getMessagesFromList(tb, r, qname, base.PendingKey) } // GetActiveMessages returns all active messages in the given queue. func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - ids := r.LRange(base.ActiveKey(qname), 0, -1).Val() + return getMessagesFromList(tb, r, qname, base.ActiveKey) +} + +// GetScheduledMessages returns all scheduled task messages in the given queue. +func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { + tb.Helper() + return getMessagesFromZSet(tb, r, qname, base.ScheduledKey) +} + +// GetRetryMessages returns all retry messages in the given queue. +func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { + tb.Helper() + return getMessagesFromZSet(tb, r, qname, base.RetryKey) +} + +// GetArchivedMessages returns all archived messages in the given queue. +func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { + tb.Helper() + return getMessagesFromZSet(tb, r, qname, base.ArchivedKey) +} + +// GetScheduledEntries returns all scheduled messages and its score in the given queue. +func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.ScheduledKey) +} + +// GetRetryEntries returns all retry messages and its score in the given queue. +func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.RetryKey) +} + +// GetArchivedEntries returns all archived messages and its score in the given queue. +func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey) +} + +// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. +func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey) +} + +// Retrieves all messages stored under `keyFn(qname)` key in redis list. +func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, keyFn func(qname string) string) []*base.TaskMessage { + tb.Helper() + ids := r.LRange(keyFn(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { data := r.HGet(base.TaskKey(qname, id), "msg").Val() @@ -338,10 +366,10 @@ func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []* return msgs } -// GetScheduledMessages returns all scheduled task messages in the given queue. -func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { +// Retrieves all messages stored under `keyFn(qname)` key in redis zset (sorted-set). +func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string, keyFn func(qname string) string) []*base.TaskMessage { tb.Helper() - ids := r.ZRange(base.ScheduledKey(qname), 0, -1).Val() + ids := r.ZRange(keyFn(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { msg := r.HGet(base.TaskKey(qname, id), "msg").Val() @@ -350,34 +378,10 @@ func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) return msgs } -// GetRetryMessages returns all retry messages in the given queue. -func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { +// Retrieves all messages along with their scores stored under `keyFn(qname)` key in redis zset (sorted-set). +func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, qname string, keyFn func(qname string) string) []base.Z { tb.Helper() - ids := r.ZRange(base.RetryKey(qname), 0, -1).Val() - var msgs []*base.TaskMessage - for _, id := range ids { - msg := r.HGet(base.TaskKey(qname, id), "msg").Val() - msgs = append(msgs, MustUnmarshal(tb, msg)) - } - return msgs -} - -// GetArchivedMessages returns all archived messages in the given queue. -func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { - tb.Helper() - ids := r.ZRange(base.ArchivedKey(qname), 0, -1).Val() - var msgs []*base.TaskMessage - for _, id := range ids { - msg := r.HGet(base.TaskKey(qname, id), "msg").Val() - msgs = append(msgs, MustUnmarshal(tb, msg)) - } - return msgs -} - -// GetScheduledEntries returns all scheduled messages and its score in the given queue. -func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { - tb.Helper() - zs := r.ZRangeWithScores(base.ScheduledKey(qname), 0, -1).Val() + zs := r.ZRangeWithScores(keyFn(qname), 0, -1).Val() var res []base.Z for _, z := range zs { msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() @@ -385,70 +389,3 @@ func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) [ } return res } - -// GetRetryEntries returns all retry messages and its score in the given queue. -func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { - tb.Helper() - zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val() - var res []base.Z - for _, z := range zs { - msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() - res = append(res, base.Z{ - Message: MustUnmarshal(tb, msg), - Score: int64(z.Score), - }) - } - return res -} - -// GetArchivedEntries returns all archived messages and its score in the given queue. -func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { - tb.Helper() - zs := r.ZRangeWithScores(base.ArchivedKey(qname), 0, -1).Val() - var res []base.Z - for _, z := range zs { - msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() - res = append(res, base.Z{ - Message: MustUnmarshal(tb, msg), - Score: int64(z.Score), - }) - } - return res -} - -// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. -func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { - tb.Helper() - zs := r.ZRangeWithScores(base.DeadlinesKey(qname), 0, -1).Val() - var res []base.Z - for _, z := range zs { - msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() - res = append(res, base.Z{ - Message: MustUnmarshal(tb, msg), - Score: int64(z.Score), - }) - } - return res -} - -func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage { - data := r.LRange(list, 0, -1).Val() - return MustUnmarshalSlice(tb, data) -} - -func getZSetMessages(tb testing.TB, r redis.UniversalClient, zset string) []*base.TaskMessage { - data := r.ZRange(zset, 0, -1).Val() - return MustUnmarshalSlice(tb, data) -} - -func getZSetEntries(tb testing.TB, r redis.UniversalClient, zset string) []base.Z { - data := r.ZRangeWithScores(zset, 0, -1).Val() - var entries []base.Z - for _, z := range data { - entries = append(entries, base.Z{ - Message: MustUnmarshal(tb, z.Member.(string)), - Score: int64(z.Score), - }) - } - return entries -}