2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Update RedisConnOpt interface

This commit is contained in:
Ken Hibino 2021-01-29 06:37:35 -08:00
parent 8eafa03ca7
commit 97b5516183
6 changed files with 65 additions and 141 deletions

View File

@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq". - `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
update your code to pass as a value.
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added ### Added

109
asynq.go
View File

@ -40,7 +40,11 @@ func NewTask(typename string, payload map[string]interface{}) *Task {
// - RedisClientOpt // - RedisClientOpt
// - RedisFailoverClientOpt // - RedisFailoverClientOpt
// - RedisClusterClientOpt // - RedisClusterClientOpt
type RedisConnOpt interface{} type RedisConnOpt interface {
// MakeRedisClient returns a new redis client instance.
// Return value is intentionally opaque to hide the implementation detail of redis client.
MakeRedisClient() interface{}
}
// RedisClientOpt is used to create a redis client that connects // RedisClientOpt is used to create a redis client that connects
// to a redis server directly. // to a redis server directly.
@ -73,6 +77,18 @@ type RedisClientOpt struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
func (opt RedisClientOpt) MakeRedisClient() interface{} {
return redis.NewClient(&redis.Options{
Network: opt.Network,
Addr: opt.Addr,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
// RedisFailoverClientOpt is used to creates a redis client that talks // RedisFailoverClientOpt is used to creates a redis client that talks
// to redis sentinels for service discovery and has an automatic failover // to redis sentinels for service discovery and has an automatic failover
// capability. // capability.
@ -109,6 +125,19 @@ type RedisFailoverClientOpt struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: opt.MasterName,
SentinelAddrs: opt.SentinelAddrs,
SentinelPassword: opt.SentinelPassword,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
}
// RedisFailoverClientOpt is used to creates a redis client that connects to // RedisFailoverClientOpt is used to creates a redis client that connects to
// redis cluster. // redis cluster.
type RedisClusterClientOpt struct { type RedisClusterClientOpt struct {
@ -133,6 +162,16 @@ type RedisClusterClientOpt struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: opt.Addrs,
MaxRedirects: opt.MaxRedirects,
Username: opt.Username,
Password: opt.Password,
TLSConfig: opt.TLSConfig,
})
}
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. // ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
// It returns a non-nil error if uri cannot be parsed. // It returns a non-nil error if uri cannot be parsed.
// //
@ -205,71 +244,3 @@ func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
} }
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil
} }
// createRedisClient returns a redis client given a redis connection configuration.
//
// Passing an unexpected type as a RedisConnOpt argument will cause panic.
func createRedisClient(r RedisConnOpt) redis.UniversalClient {
switch r := r.(type) {
case *RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case *RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
case *RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
default:
panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r))
}
}

View File

@ -11,6 +11,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
@ -30,7 +31,11 @@ type Client struct {
// NewClient returns a new Client instance given a redis connection option. // NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client { func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r)) c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
rdb := rdb.NewRDB(c)
return &Client{ return &Client{
opts: make(map[string][]Option), opts: make(map[string][]Option),
rdb: rdb, rdb: rdb,

View File

@ -25,76 +25,12 @@ type Inspector struct {
// New returns a new instance of Inspector. // New returns a new instance of Inspector.
func New(r asynq.RedisConnOpt) *Inspector { func New(r asynq.RedisConnOpt) *Inspector {
return &Inspector{ c, ok := r.MakeRedisClient().(redis.UniversalClient)
rdb: rdb.NewRDB(createRedisClient(r)), if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
} }
} return &Inspector{
rdb: rdb.NewRDB(c),
// createRedisClient returns a redis client given a redis connection configuration.
//
// Passing an unexpected type as a RedisConnOpt argument will cause panic.
func createRedisClient(r asynq.RedisConnOpt) redis.UniversalClient {
switch r := r.(type) {
case *asynq.RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case *asynq.RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
case *asynq.RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
default:
panic(fmt.Sprintf("inspeq: unexpected type %T for RedisConnOpt", r))
} }
} }

View File

@ -10,6 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
@ -34,6 +35,10 @@ type Scheduler struct {
// NewScheduler returns a new Scheduler instance given the redis connection option. // NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil // The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
if opts == nil { if opts == nil {
opts = &SchedulerOpts{} opts = &SchedulerOpts{}
} }
@ -55,7 +60,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
status: base.NewServerStatus(base.StatusIdle), status: base.NewServerStatus(base.StatusIdle),
logger: logger, logger: logger,
client: NewClient(r), client: NewClient(r),
rdb: rdb.NewRDB(createRedisClient(r)), rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)), cron: cron.New(cron.WithLocation(loc)),
location: loc, location: loc,
done: make(chan struct{}), done: make(chan struct{}),

View File

@ -15,6 +15,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
@ -279,6 +280,10 @@ const (
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
// and background processing configuration. // and background processing configuration.
func NewServer(r RedisConnOpt, cfg Config) *Server { func NewServer(r RedisConnOpt, cfg Config) *Server {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
n := cfg.Concurrency n := cfg.Concurrency
if n < 1 { if n < 1 {
n = runtime.NumCPU() n = runtime.NumCPU()
@ -315,7 +320,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
} }
logger.SetLevel(toInternalLogLevel(loglevel)) logger.SetLevel(toInternalLogLevel(loglevel))
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(c)
starting := make(chan *workerInfo) starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage) finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest) syncCh := make(chan *syncRequest)