2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Add RedisClusterClientOpt to connect to redis cluster

This commit is contained in:
Ken Hibino
2020-08-28 05:40:16 -07:00
parent 7ee1e27822
commit ecbfac2c46
2 changed files with 60 additions and 29 deletions

View File

@@ -145,7 +145,7 @@ func MustUnmarshalSlice(tb testing.TB, data []string) []*base.TaskMessage {
}
// FlushDB deletes all the keys of the currently selected DB.
func FlushDB(tb testing.TB, r *redis.Client) {
func FlushDB(tb testing.TB, r redis.UniversalClient) {
tb.Helper()
if err := r.FlushDB().Err(); err != nil {
tb.Fatal(err)
@@ -153,42 +153,42 @@ func FlushDB(tb testing.TB, r *redis.Client) {
}
// SeedEnqueuedQueue initializes the specified queue with the given messages.
func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) {
func SeedEnqueuedQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.QueueKey(qname), msgs)
}
// SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) {
func SeedInProgressQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.InProgressKey(qname), msgs)
}
// SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) {
func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.ScheduledKey(qname), entries)
}
// SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) {
func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.RetryKey(qname), entries)
}
// SeedDeadQueue initializes the dead queue with the given messages.
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) {
func SeedDeadQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.DeadKey(qname), entries)
}
// SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z, qname string) {
func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries)
@@ -197,48 +197,48 @@ func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z, qname strin
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
//
// enqueued maps a queue name to a list of messages.
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) {
func SeedAllEnqueuedQueues(tb testing.TB, r redis.UniversalClient, enqueued map[string][]*base.TaskMessage) {
for q, msgs := range enqueued {
SeedEnqueuedQueue(tb, r, msgs, q)
}
}
// SeedAllInProgressQueues initializes all of the specified in-progress queues with the given messages.
func SeedAllInProgressQueues(tb testing.TB, r *redis.Client, inprogress map[string][]*base.TaskMessage) {
func SeedAllInProgressQueues(tb testing.TB, r redis.UniversalClient, inprogress map[string][]*base.TaskMessage) {
for q, msgs := range inprogress {
SeedInProgressQueue(tb, r, msgs, q)
}
}
// SeedAllScheduledQueues initializes all of the specified scheduled queues with the given entries.
func SeedAllScheduledQueues(tb testing.TB, r *redis.Client, scheduled map[string][]base.Z) {
func SeedAllScheduledQueues(tb testing.TB, r redis.UniversalClient, scheduled map[string][]base.Z) {
for q, entries := range scheduled {
SeedScheduledQueue(tb, r, entries, q)
}
}
// SeedAllRetryQueues initializes all of the specified retry queues with the given entries.
func SeedAllRetryQueues(tb testing.TB, r *redis.Client, retry map[string][]base.Z) {
func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string][]base.Z) {
for q, entries := range retry {
SeedRetryQueue(tb, r, entries, q)
}
}
// SeedAllDeadQueues initializes all of the specified dead queues with the given entries.
func SeedAllDeadQueues(tb testing.TB, r *redis.Client, dead map[string][]base.Z) {
func SeedAllDeadQueues(tb testing.TB, r redis.UniversalClient, dead map[string][]base.Z) {
for q, entries := range dead {
SeedDeadQueue(tb, r, entries, q)
}
}
// SeedAllDeadlines initializes all of the deadlines with the given entries.
func SeedAllDeadlines(tb testing.TB, r *redis.Client, deadlines map[string][]base.Z) {
func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) {
for q, entries := range deadlines {
SeedDeadlines(tb, r, entries, q)
}
}
func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.TaskMessage) {
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) {
data := MustMarshalSlice(tb, msgs)
for _, s := range data {
if err := c.LPush(key, s).Err(); err != nil {
@@ -247,7 +247,7 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
}
}
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) {
for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil {
@@ -257,70 +257,70 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
}
// GetEnqueuedMessages returns all enqueued messages in the given queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage {
func GetEnqueuedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.QueueKey(qname))
}
// GetInProgressMessages returns all in-progress messages in the given queue.
func GetInProgressMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage {
func GetInProgressMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.InProgressKey(qname))
}
// GetScheduledMessages returns all scheduled task messages in the given queue.
func GetScheduledMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage {
func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getZSetMessages(tb, r, base.ScheduledKey(qname))
}
// GetRetryMessages returns all retry messages in the given queue.
func GetRetryMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage {
func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getZSetMessages(tb, r, base.RetryKey(qname))
}
// GetDeadMessages returns all dead messages in the given queue.
func GetDeadMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage {
func GetDeadMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getZSetMessages(tb, r, base.DeadKey(qname))
}
// GetScheduledEntries returns all scheduled messages and its score in the given queue.
func GetScheduledEntries(tb testing.TB, r *redis.Client, qname string) []base.Z {
func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper()
return getZSetEntries(tb, r, base.ScheduledKey(qname))
}
// GetRetryEntries returns all retry messages and its score in the given queue.
func GetRetryEntries(tb testing.TB, r *redis.Client, qname string) []base.Z {
func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper()
return getZSetEntries(tb, r, base.RetryKey(qname))
}
// GetDeadEntries returns all dead messages and its score in the given queue.
func GetDeadEntries(tb testing.TB, r *redis.Client, qname string) []base.Z {
func GetDeadEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper()
return getZSetEntries(tb, r, base.DeadKey(qname))
}
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client, qname string) []base.Z {
func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper()
return getZSetEntries(tb, r, base.DeadlinesKey(qname))
}
func getListMessages(tb testing.TB, r *redis.Client, list string) []*base.TaskMessage {
func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage {
data := r.LRange(list, 0, -1).Val()
return MustUnmarshalSlice(tb, data)
}
func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMessage {
func getZSetMessages(tb testing.TB, r redis.UniversalClient, zset string) []*base.TaskMessage {
data := r.ZRange(zset, 0, -1).Val()
return MustUnmarshalSlice(tb, data)
}
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z {
func getZSetEntries(tb testing.TB, r redis.UniversalClient, zset string) []base.Z {
data := r.ZRangeWithScores(zset, 0, -1).Val()
var entries []base.Z
for _, z := range data {