diff --git a/scheduler.go b/scheduler.go index 0026185..5226a3b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -54,12 +54,19 @@ const defaultHeartbeatInterval = 10 * time.Second // NewScheduler returns a new Scheduler instance given the redis connection option. // The parameter opts is optional, defaults will be used if opts is set to nil func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { + scheduler := newScheduler(opts) + redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - scheduler := NewSchedulerFromRedisClient(redisClient, opts) + + rdb := rdb.NewRDB(redisClient) + + scheduler.rdb = rdb + scheduler.client = &Client{broker: rdb, sharedConnection: false} scheduler.sharedConnection = false + return scheduler } @@ -67,6 +74,16 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { // The parameter opts is optional, defaults will be used if opts is set to nil. // Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it. func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler { + scheduler := newScheduler(opts) + + scheduler.rdb = rdb.NewRDB(c) + scheduler.client = NewClientFromRedisClient(c) + scheduler.sharedConnection = true + + return scheduler +} + +func newScheduler(opts *SchedulerOpts) *Scheduler { if opts == nil { opts = &SchedulerOpts{} } @@ -93,8 +110,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * state: &serverState{value: srvStateNew}, heartbeatInterval: heartbeatInterval, logger: logger, - client: NewClientFromRedisClient(c), - rdb: rdb.NewRDB(c), cron: cron.New(cron.WithLocation(loc)), location: loc, done: make(chan struct{}),