2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 15:22:18 +08:00

fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler
This commit is contained in:
Mohammed Sohail 2024-12-03 10:08:47 +03:00
parent 106c07adaa
commit ee17997650
No known key found for this signature in database
GPG Key ID: 7DD45520C01CD85D

View File

@ -54,12 +54,19 @@ const defaultHeartbeatInterval = 10 * time.Second
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
rdb := rdb.NewRDB(redisClient)
scheduler.rdb = rdb
scheduler.client = &Client{broker: rdb, sharedConnection: false}
scheduler.sharedConnection = false
return scheduler
}
@ -67,6 +74,16 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
scheduler := newScheduler(opts)
scheduler.rdb = rdb.NewRDB(c)
scheduler.client = NewClientFromRedisClient(c)
scheduler.sharedConnection = true
return scheduler
}
func newScheduler(opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
@ -93,8 +110,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),