diff --git a/scheduler.go b/scheduler.go index 0026185..9dbfa1b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -44,9 +44,6 @@ type Scheduler struct { // to avoid using cron.EntryID as the public API of // the Scheduler. idmap map[string]cron.EntryID - // When a Scheduler has been created with an existing Redis connection, we do - // not want to close it. - sharedConnection bool } const defaultHeartbeatInterval = 10 * time.Second @@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second // NewScheduler returns a new Scheduler instance given the redis connection option. // The parameter opts is optional, defaults will be used if opts is set to nil func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { + scheduler := newScheduler(opts) + redisClient, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - scheduler := NewSchedulerFromRedisClient(redisClient, opts) - scheduler.sharedConnection = false + + rdb := rdb.NewRDB(redisClient) + + scheduler.rdb = rdb + scheduler.client = &Client{broker: rdb, sharedConnection: false} + return scheduler } @@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { // The parameter opts is optional, defaults will be used if opts is set to nil. // Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it. func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler { + scheduler := newScheduler(opts) + + scheduler.rdb = rdb.NewRDB(c) + scheduler.client = NewClientFromRedisClient(c) + + return scheduler +} + +func newScheduler(opts *SchedulerOpts) *Scheduler { if opts == nil { opts = &SchedulerOpts{} } @@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) * state: &serverState{value: srvStateNew}, heartbeatInterval: heartbeatInterval, logger: logger, - client: NewClientFromRedisClient(c), - rdb: rdb.NewRDB(c), cron: cron.New(cron.WithLocation(loc)), location: loc, done: make(chan struct{}), @@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() { if err := s.client.Close(); err != nil { s.logger.Errorf("Failed to close redis client connection: %v", err) } - if !s.sharedConnection { - s.rdb.Close() - } s.logger.Info("Scheduler stopped") }