From 97b5516183ced6c7fe83c49000708ad298f48809 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 29 Jan 2021 06:37:35 -0800 Subject: [PATCH] Update RedisConnOpt interface --- CHANGELOG.md | 2 + asynq.go | 109 ++++++++++++++++---------------------------- client.go | 7 ++- inspeq/inspector.go | 74 ++---------------------------- scheduler.go | 7 ++- server.go | 7 ++- 6 files changed, 65 insertions(+), 141 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a488762..f7989bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq". +- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer, + update your code to pass as a value. - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. ### Added diff --git a/asynq.go b/asynq.go index 963a5b7..d1abfa0 100644 --- a/asynq.go +++ b/asynq.go @@ -40,7 +40,11 @@ func NewTask(typename string, payload map[string]interface{}) *Task { // - RedisClientOpt // - RedisFailoverClientOpt // - RedisClusterClientOpt -type RedisConnOpt interface{} +type RedisConnOpt interface { + // MakeRedisClient returns a new redis client instance. + // Return value is intentionally opaque to hide the implementation detail of redis client. + MakeRedisClient() interface{} +} // RedisClientOpt is used to create a redis client that connects // to a redis server directly. @@ -73,6 +77,18 @@ type RedisClientOpt struct { TLSConfig *tls.Config } +func (opt RedisClientOpt) MakeRedisClient() interface{} { + return redis.NewClient(&redis.Options{ + Network: opt.Network, + Addr: opt.Addr, + Username: opt.Username, + Password: opt.Password, + DB: opt.DB, + PoolSize: opt.PoolSize, + TLSConfig: opt.TLSConfig, + }) +} + // RedisFailoverClientOpt is used to creates a redis client that talks // to redis sentinels for service discovery and has an automatic failover // capability. @@ -109,6 +125,19 @@ type RedisFailoverClientOpt struct { TLSConfig *tls.Config } +func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} { + return redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: opt.MasterName, + SentinelAddrs: opt.SentinelAddrs, + SentinelPassword: opt.SentinelPassword, + Username: opt.Username, + Password: opt.Password, + DB: opt.DB, + PoolSize: opt.PoolSize, + TLSConfig: opt.TLSConfig, + }) +} + // RedisFailoverClientOpt is used to creates a redis client that connects to // redis cluster. type RedisClusterClientOpt struct { @@ -133,6 +162,16 @@ type RedisClusterClientOpt struct { TLSConfig *tls.Config } +func (opt RedisClusterClientOpt) MakeRedisClient() interface{} { + return redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: opt.Addrs, + MaxRedirects: opt.MaxRedirects, + Username: opt.Username, + Password: opt.Password, + TLSConfig: opt.TLSConfig, + }) +} + // ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. // It returns a non-nil error if uri cannot be parsed. // @@ -205,71 +244,3 @@ func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) { } return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil } - -// createRedisClient returns a redis client given a redis connection configuration. -// -// Passing an unexpected type as a RedisConnOpt argument will cause panic. -func createRedisClient(r RedisConnOpt) redis.UniversalClient { - switch r := r.(type) { - case *RedisClientOpt: - return redis.NewClient(&redis.Options{ - Network: r.Network, - Addr: r.Addr, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case RedisClientOpt: - return redis.NewClient(&redis.Options{ - Network: r.Network, - Addr: r.Addr, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case *RedisFailoverClientOpt: - return redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: r.MasterName, - SentinelAddrs: r.SentinelAddrs, - SentinelPassword: r.SentinelPassword, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case RedisFailoverClientOpt: - return redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: r.MasterName, - SentinelAddrs: r.SentinelAddrs, - SentinelPassword: r.SentinelPassword, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case RedisClusterClientOpt: - return redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: r.Addrs, - MaxRedirects: r.MaxRedirects, - Username: r.Username, - Password: r.Password, - TLSConfig: r.TLSConfig, - }) - case *RedisClusterClientOpt: - return redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: r.Addrs, - MaxRedirects: r.MaxRedirects, - Username: r.Username, - Password: r.Password, - TLSConfig: r.TLSConfig, - }) - default: - panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r)) - } -} diff --git a/client.go b/client.go index 010d4b7..9402099 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" @@ -30,7 +31,11 @@ type Client struct { // NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { - rdb := rdb.NewRDB(createRedisClient(r)) + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) + } + rdb := rdb.NewRDB(c) return &Client{ opts: make(map[string][]Option), rdb: rdb, diff --git a/inspeq/inspector.go b/inspeq/inspector.go index 348e8f7..fdb1cb7 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -25,76 +25,12 @@ type Inspector struct { // New returns a new instance of Inspector. func New(r asynq.RedisConnOpt) *Inspector { - return &Inspector{ - rdb: rdb.NewRDB(createRedisClient(r)), + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } -} - -// createRedisClient returns a redis client given a redis connection configuration. -// -// Passing an unexpected type as a RedisConnOpt argument will cause panic. -func createRedisClient(r asynq.RedisConnOpt) redis.UniversalClient { - switch r := r.(type) { - case *asynq.RedisClientOpt: - return redis.NewClient(&redis.Options{ - Network: r.Network, - Addr: r.Addr, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case asynq.RedisClientOpt: - return redis.NewClient(&redis.Options{ - Network: r.Network, - Addr: r.Addr, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case *asynq.RedisFailoverClientOpt: - return redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: r.MasterName, - SentinelAddrs: r.SentinelAddrs, - SentinelPassword: r.SentinelPassword, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case asynq.RedisFailoverClientOpt: - return redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: r.MasterName, - SentinelAddrs: r.SentinelAddrs, - SentinelPassword: r.SentinelPassword, - Username: r.Username, - Password: r.Password, - DB: r.DB, - PoolSize: r.PoolSize, - TLSConfig: r.TLSConfig, - }) - case asynq.RedisClusterClientOpt: - return redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: r.Addrs, - MaxRedirects: r.MaxRedirects, - Username: r.Username, - Password: r.Password, - TLSConfig: r.TLSConfig, - }) - case *asynq.RedisClusterClientOpt: - return redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: r.Addrs, - MaxRedirects: r.MaxRedirects, - Username: r.Username, - Password: r.Password, - TLSConfig: r.TLSConfig, - }) - default: - panic(fmt.Sprintf("inspeq: unexpected type %T for RedisConnOpt", r)) + return &Inspector{ + rdb: rdb.NewRDB(c), } } diff --git a/scheduler.go b/scheduler.go index aa23586..5c4dbf6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" @@ -34,6 +35,10 @@ type Scheduler struct { // NewScheduler returns a new Scheduler instance given the redis connection option. // The parameter opts is optional, defaults will be used if opts is set to nil func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) + } if opts == nil { opts = &SchedulerOpts{} } @@ -55,7 +60,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { status: base.NewServerStatus(base.StatusIdle), logger: logger, client: NewClient(r), - rdb: rdb.NewRDB(createRedisClient(r)), + rdb: rdb.NewRDB(c), cron: cron.New(cron.WithLocation(loc)), location: loc, done: make(chan struct{}), diff --git a/server.go b/server.go index 859fe5d..68ff381 100644 --- a/server.go +++ b/server.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" @@ -279,6 +280,10 @@ const ( // NewServer returns a new Server given a redis connection option // and background processing configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { + c, ok := r.MakeRedisClient().(redis.UniversalClient) + if !ok { + panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) + } n := cfg.Concurrency if n < 1 { n = runtime.NumCPU() @@ -315,7 +320,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) - rdb := rdb.NewRDB(createRedisClient(r)) + rdb := rdb.NewRDB(c) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest)