mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-17 20:30:06 +08:00
Compare commits
2 Commits
v0.25.1
...
sohail/pm-
Author | SHA1 | Date | |
---|---|---|---|
![]() |
f1e7dc4056 | ||
![]() |
ee17997650 |
27
scheduler.go
27
scheduler.go
@@ -44,9 +44,6 @@ type Scheduler struct {
|
|||||||
// to avoid using cron.EntryID as the public API of
|
// to avoid using cron.EntryID as the public API of
|
||||||
// the Scheduler.
|
// the Scheduler.
|
||||||
idmap map[string]cron.EntryID
|
idmap map[string]cron.EntryID
|
||||||
// When a Scheduler has been created with an existing Redis connection, we do
|
|
||||||
// not want to close it.
|
|
||||||
sharedConnection bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultHeartbeatInterval = 10 * time.Second
|
const defaultHeartbeatInterval = 10 * time.Second
|
||||||
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
|
|||||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
|
||||||
scheduler.sharedConnection = false
|
rdb := rdb.NewRDB(redisClient)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb
|
||||||
|
scheduler.client = &Client{broker: rdb, sharedConnection: false}
|
||||||
|
|
||||||
return scheduler
|
return scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
|||||||
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||||
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb.NewRDB(c)
|
||||||
|
scheduler.client = NewClientFromRedisClient(c)
|
||||||
|
|
||||||
|
return scheduler
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScheduler(opts *SchedulerOpts) *Scheduler {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = &SchedulerOpts{}
|
opts = &SchedulerOpts{}
|
||||||
}
|
}
|
||||||
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
|||||||
state: &serverState{value: srvStateNew},
|
state: &serverState{value: srvStateNew},
|
||||||
heartbeatInterval: heartbeatInterval,
|
heartbeatInterval: heartbeatInterval,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
client: NewClientFromRedisClient(c),
|
|
||||||
rdb: rdb.NewRDB(c),
|
|
||||||
cron: cron.New(cron.WithLocation(loc)),
|
cron: cron.New(cron.WithLocation(loc)),
|
||||||
location: loc,
|
location: loc,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
|
|||||||
if err := s.client.Close(); err != nil {
|
if err := s.client.Close(); err != nil {
|
||||||
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||||
}
|
}
|
||||||
if !s.sharedConnection {
|
|
||||||
s.rdb.Close()
|
|
||||||
}
|
|
||||||
s.logger.Info("Scheduler stopped")
|
s.logger.Info("Scheduler stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user