2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Add RedisClusterClientOpt to connect to redis cluster

This commit is contained in:
Ken Hibino 2020-08-28 05:40:16 -07:00
parent ee1afd12f5
commit 27f4027447
2 changed files with 60 additions and 29 deletions

View File

@ -37,7 +37,9 @@ func NewTask(typename string, payload map[string]interface{}) *Task {
// //
// RedisConnOpt represents a sum of following types: // RedisConnOpt represents a sum of following types:
// //
// RedisClientOpt | *RedisClientOpt | RedisFailoverClientOpt | *RedisFailoverClientOpt // - RedisClientOpt
// - RedisFailoverClientOpt
// - RedisClusterClientOpt
type RedisConnOpt interface{} type RedisConnOpt interface{}
// RedisClientOpt is used to create a redis client that connects // RedisClientOpt is used to create a redis client that connects
@ -50,6 +52,7 @@ type RedisClientOpt struct {
// Redis server address in "host:port" format. // Redis server address in "host:port" format.
Addr string Addr string
// TODO: Add Username
// Redis server password. // Redis server password.
Password string Password string
@ -81,6 +84,7 @@ type RedisFailoverClientOpt struct {
// Redis sentinel password. // Redis sentinel password.
SentinelPassword string SentinelPassword string
// TODO: Add Username
// Redis server password. // Redis server password.
Password string Password string
@ -97,6 +101,21 @@ type RedisFailoverClientOpt struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
// RedisFailoverClientOpt is used to creates a redis client that connects to
// redis cluster.
type RedisClusterClientOpt struct {
// A seed list of host:port addresses of cluster nodes.
Addrs []string
// TODO: Add Username
// Redis server password.
Password string
// TLS Config used to connect to a server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config
}
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. // ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
// It returns a non-nil error if uri cannot be parsed. // It returns a non-nil error if uri cannot be parsed.
// //
@ -173,7 +192,7 @@ func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
// createRedisClient returns a redis client given a redis connection configuration. // createRedisClient returns a redis client given a redis connection configuration.
// //
// Passing an unexpected type as a RedisConnOpt argument will cause panic. // Passing an unexpected type as a RedisConnOpt argument will cause panic.
func createRedisClient(r RedisConnOpt) *redis.Client { func createRedisClient(r RedisConnOpt) redis.UniversalClient {
switch r := r.(type) { switch r := r.(type) {
case *RedisClientOpt: case *RedisClientOpt:
return redis.NewClient(&redis.Options{ return redis.NewClient(&redis.Options{
@ -213,6 +232,18 @@ func createRedisClient(r RedisConnOpt) *redis.Client {
PoolSize: r.PoolSize, PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig, TLSConfig: r.TLSConfig,
}) })
case RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
case *RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
default: default:
panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r)) panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r))
} }

View File

@ -145,7 +145,7 @@ func MustUnmarshalSlice(tb testing.TB, data []string) []*base.TaskMessage {
} }
// FlushDB deletes all the keys of the currently selected DB. // FlushDB deletes all the keys of the currently selected DB.
func FlushDB(tb testing.TB, r *redis.Client) { func FlushDB(tb testing.TB, r redis.UniversalClient) {
tb.Helper() tb.Helper()
if err := r.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
@ -153,42 +153,42 @@ func FlushDB(tb testing.TB, r *redis.Client) {
} }
// SeedEnqueuedQueue initializes the specified queue with the given messages. // SeedEnqueuedQueue initializes the specified queue with the given messages.
func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) { func SeedEnqueuedQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.QueueKey(qname), msgs) seedRedisList(tb, r, base.QueueKey(qname), msgs)
} }
// SeedInProgressQueue initializes the in-progress queue with the given messages. // SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) { func SeedInProgressQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.InProgressKey(qname), msgs) seedRedisList(tb, r, base.InProgressKey(qname), msgs)
} }
// SeedScheduledQueue initializes the scheduled queue with the given messages. // SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.ScheduledKey(qname), entries) seedRedisZSet(tb, r, base.ScheduledKey(qname), entries)
} }
// SeedRetryQueue initializes the retry queue with the given messages. // SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.RetryKey(qname), entries) seedRedisZSet(tb, r, base.RetryKey(qname), entries)
} }
// SeedDeadQueue initializes the dead queue with the given messages. // SeedDeadQueue initializes the dead queue with the given messages.
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { func SeedDeadQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.DeadKey(qname), entries) seedRedisZSet(tb, r, base.DeadKey(qname), entries)
} }
// SeedDeadlines initializes the deadlines set with the given entries. // SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries) seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries)
@ -197,48 +197,48 @@ func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z, qname strin
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages. // SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
// //
// enqueued maps a queue name to a list of messages. // enqueued maps a queue name to a list of messages.
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) { func SeedAllEnqueuedQueues(tb testing.TB, r redis.UniversalClient, enqueued map[string][]*base.TaskMessage) {
for q, msgs := range enqueued { for q, msgs := range enqueued {
SeedEnqueuedQueue(tb, r, msgs, q) SeedEnqueuedQueue(tb, r, msgs, q)
} }
} }
// SeedAllInProgressQueues initializes all of the specified in-progress queues with the given messages. // SeedAllInProgressQueues initializes all of the specified in-progress queues with the given messages.
func SeedAllInProgressQueues(tb testing.TB, r *redis.Client, inprogress map[string][]*base.TaskMessage) { func SeedAllInProgressQueues(tb testing.TB, r redis.UniversalClient, inprogress map[string][]*base.TaskMessage) {
for q, msgs := range inprogress { for q, msgs := range inprogress {
SeedInProgressQueue(tb, r, msgs, q) SeedInProgressQueue(tb, r, msgs, q)
} }
} }
// SeedAllScheduledQueues initializes all of the specified scheduled queues with the given entries. // SeedAllScheduledQueues initializes all of the specified scheduled queues with the given entries.
func SeedAllScheduledQueues(tb testing.TB, r *redis.Client, scheduled map[string][]base.Z) { func SeedAllScheduledQueues(tb testing.TB, r redis.UniversalClient, scheduled map[string][]base.Z) {
for q, entries := range scheduled { for q, entries := range scheduled {
SeedScheduledQueue(tb, r, entries, q) SeedScheduledQueue(tb, r, entries, q)
} }
} }
// SeedAllRetryQueues initializes all of the specified retry queues with the given entries. // SeedAllRetryQueues initializes all of the specified retry queues with the given entries.
func SeedAllRetryQueues(tb testing.TB, r *redis.Client, retry map[string][]base.Z) { func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string][]base.Z) {
for q, entries := range retry { for q, entries := range retry {
SeedRetryQueue(tb, r, entries, q) SeedRetryQueue(tb, r, entries, q)
} }
} }
// SeedAllDeadQueues initializes all of the specified dead queues with the given entries. // SeedAllDeadQueues initializes all of the specified dead queues with the given entries.
func SeedAllDeadQueues(tb testing.TB, r *redis.Client, dead map[string][]base.Z) { func SeedAllDeadQueues(tb testing.TB, r redis.UniversalClient, dead map[string][]base.Z) {
for q, entries := range dead { for q, entries := range dead {
SeedDeadQueue(tb, r, entries, q) SeedDeadQueue(tb, r, entries, q)
} }
} }
// SeedAllDeadlines initializes all of the deadlines with the given entries. // SeedAllDeadlines initializes all of the deadlines with the given entries.
func SeedAllDeadlines(tb testing.TB, r *redis.Client, deadlines map[string][]base.Z) { func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) {
for q, entries := range deadlines { for q, entries := range deadlines {
SeedDeadlines(tb, r, entries, q) SeedDeadlines(tb, r, entries, q)
} }
} }
func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.TaskMessage) { func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) {
data := MustMarshalSlice(tb, msgs) data := MustMarshalSlice(tb, msgs)
for _, s := range data { for _, s := range data {
if err := c.LPush(key, s).Err(); err != nil { if err := c.LPush(key, s).Err(); err != nil {
@ -247,7 +247,7 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
} }
} }
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) { func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) {
for _, item := range items { for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)} z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil { if err := c.ZAdd(key, z).Err(); err != nil {
@ -257,70 +257,70 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
} }
// GetEnqueuedMessages returns all enqueued messages in the given queue. // GetEnqueuedMessages returns all enqueued messages in the given queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { func GetEnqueuedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getListMessages(tb, r, base.QueueKey(qname)) return getListMessages(tb, r, base.QueueKey(qname))
} }
// GetInProgressMessages returns all in-progress messages in the given queue. // GetInProgressMessages returns all in-progress messages in the given queue.
func GetInProgressMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { func GetInProgressMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getListMessages(tb, r, base.InProgressKey(qname)) return getListMessages(tb, r, base.InProgressKey(qname))
} }
// GetScheduledMessages returns all scheduled task messages in the given queue. // GetScheduledMessages returns all scheduled task messages in the given queue.
func GetScheduledMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.ScheduledKey(qname)) return getZSetMessages(tb, r, base.ScheduledKey(qname))
} }
// GetRetryMessages returns all retry messages in the given queue. // GetRetryMessages returns all retry messages in the given queue.
func GetRetryMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.RetryKey(qname)) return getZSetMessages(tb, r, base.RetryKey(qname))
} }
// GetDeadMessages returns all dead messages in the given queue. // GetDeadMessages returns all dead messages in the given queue.
func GetDeadMessages(tb testing.TB, r *redis.Client, qname string) []*base.TaskMessage { func GetDeadMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.DeadKey(qname)) return getZSetMessages(tb, r, base.DeadKey(qname))
} }
// GetScheduledEntries returns all scheduled messages and its score in the given queue. // GetScheduledEntries returns all scheduled messages and its score in the given queue.
func GetScheduledEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.ScheduledKey(qname)) return getZSetEntries(tb, r, base.ScheduledKey(qname))
} }
// GetRetryEntries returns all retry messages and its score in the given queue. // GetRetryEntries returns all retry messages and its score in the given queue.
func GetRetryEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.RetryKey(qname)) return getZSetEntries(tb, r, base.RetryKey(qname))
} }
// GetDeadEntries returns all dead messages and its score in the given queue. // GetDeadEntries returns all dead messages and its score in the given queue.
func GetDeadEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { func GetDeadEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.DeadKey(qname)) return getZSetEntries(tb, r, base.DeadKey(qname))
} }
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client, qname string) []base.Z { func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.DeadlinesKey(qname)) return getZSetEntries(tb, r, base.DeadlinesKey(qname))
} }
func getListMessages(tb testing.TB, r *redis.Client, list string) []*base.TaskMessage { func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage {
data := r.LRange(list, 0, -1).Val() data := r.LRange(list, 0, -1).Val()
return MustUnmarshalSlice(tb, data) return MustUnmarshalSlice(tb, data)
} }
func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMessage { func getZSetMessages(tb testing.TB, r redis.UniversalClient, zset string) []*base.TaskMessage {
data := r.ZRange(zset, 0, -1).Val() data := r.ZRange(zset, 0, -1).Val()
return MustUnmarshalSlice(tb, data) return MustUnmarshalSlice(tb, data)
} }
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z { func getZSetEntries(tb testing.TB, r redis.UniversalClient, zset string) []base.Z {
data := r.ZRangeWithScores(zset, 0, -1).Val() data := r.ZRangeWithScores(zset, 0, -1).Val()
var entries []base.Z var entries []base.Z
for _, z := range data { for _, z := range data {