mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-13 04:46:39 +08:00
Add the scheduler option HeartbeatInterval (#956)
* Add the scheduler option HeartbeatInterval * Fix possible premature expiration of scheduler entries
This commit is contained in:
parent
580d69e88f
commit
4f00f52c1d
@ -1437,7 +1437,7 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
|
||||
|
||||
// KEYS[1] -> asynq:schedulers:{<schedulerID>}
|
||||
// ARGV[1] -> TTL in seconds
|
||||
// ARGV[2:] -> schedler entries
|
||||
// ARGV[2:] -> scheduler entries
|
||||
var writeSchedulerEntriesCmd = redis.NewScript(`
|
||||
redis.call("DEL", KEYS[1])
|
||||
for i = 2, #ARGV do
|
||||
@ -1468,10 +1468,10 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
|
||||
}
|
||||
|
||||
// ClearSchedulerEntries deletes scheduler entries data from redis.
|
||||
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
|
||||
func (r *RDB) ClearSchedulerEntries(schedulerID string) error {
|
||||
var op errors.Op = "rdb.ClearSchedulerEntries"
|
||||
ctx := context.Background()
|
||||
key := base.SchedulerEntriesKey(scheduelrID)
|
||||
key := base.SchedulerEntriesKey(schedulerID)
|
||||
if err := r.client.ZRem(ctx, base.AllSchedulers, key).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err})
|
||||
}
|
||||
|
66
scheduler.go
66
scheduler.go
@ -26,16 +26,17 @@ type Scheduler struct {
|
||||
|
||||
state *serverState
|
||||
|
||||
logger *log.Logger
|
||||
client *Client
|
||||
rdb *rdb.RDB
|
||||
cron *cron.Cron
|
||||
location *time.Location
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
preEnqueueFunc func(task *Task, opts []Option)
|
||||
postEnqueueFunc func(info *TaskInfo, err error)
|
||||
errHandler func(task *Task, opts []Option, err error)
|
||||
heartbeatInterval time.Duration
|
||||
logger *log.Logger
|
||||
client *Client
|
||||
rdb *rdb.RDB
|
||||
cron *cron.Cron
|
||||
location *time.Location
|
||||
done chan struct{}
|
||||
wg sync.WaitGroup
|
||||
preEnqueueFunc func(task *Task, opts []Option)
|
||||
postEnqueueFunc func(info *TaskInfo, err error)
|
||||
errHandler func(task *Task, opts []Option, err error)
|
||||
|
||||
// guards idmap
|
||||
mu sync.Mutex
|
||||
@ -48,6 +49,8 @@ type Scheduler struct {
|
||||
sharedConnection bool
|
||||
}
|
||||
|
||||
const defaultHeartbeatInterval = 10 * time.Second
|
||||
|
||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
@ -68,6 +71,11 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
||||
opts = &SchedulerOpts{}
|
||||
}
|
||||
|
||||
heartbeatInterval := opts.HeartbeatInterval
|
||||
if heartbeatInterval <= 0 {
|
||||
heartbeatInterval = defaultHeartbeatInterval
|
||||
}
|
||||
|
||||
logger := log.NewLogger(opts.Logger)
|
||||
loglevel := opts.LogLevel
|
||||
if loglevel == level_unspecified {
|
||||
@ -81,18 +89,19 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
||||
}
|
||||
|
||||
return &Scheduler{
|
||||
id: generateSchedulerID(),
|
||||
state: &serverState{value: srvStateNew},
|
||||
logger: logger,
|
||||
client: NewClientFromRedisClient(c),
|
||||
rdb: rdb.NewRDB(c),
|
||||
cron: cron.New(cron.WithLocation(loc)),
|
||||
location: loc,
|
||||
done: make(chan struct{}),
|
||||
preEnqueueFunc: opts.PreEnqueueFunc,
|
||||
postEnqueueFunc: opts.PostEnqueueFunc,
|
||||
errHandler: opts.EnqueueErrorHandler,
|
||||
idmap: make(map[string]cron.EntryID),
|
||||
id: generateSchedulerID(),
|
||||
state: &serverState{value: srvStateNew},
|
||||
heartbeatInterval: heartbeatInterval,
|
||||
logger: logger,
|
||||
client: NewClientFromRedisClient(c),
|
||||
rdb: rdb.NewRDB(c),
|
||||
cron: cron.New(cron.WithLocation(loc)),
|
||||
location: loc,
|
||||
done: make(chan struct{}),
|
||||
preEnqueueFunc: opts.PreEnqueueFunc,
|
||||
postEnqueueFunc: opts.PostEnqueueFunc,
|
||||
errHandler: opts.EnqueueErrorHandler,
|
||||
idmap: make(map[string]cron.EntryID),
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,6 +115,15 @@ func generateSchedulerID() string {
|
||||
|
||||
// SchedulerOpts specifies scheduler options.
|
||||
type SchedulerOpts struct {
|
||||
// HeartbeatInterval specifies the interval between scheduler heartbeats.
|
||||
//
|
||||
// If unset, zero or a negative value, the interval is set to 10 second.
|
||||
//
|
||||
// Note: Setting this value too low may add significant load to redis.
|
||||
//
|
||||
// By default, HeartbeatInterval is set to 10 seconds.
|
||||
HeartbeatInterval time.Duration
|
||||
|
||||
// Logger specifies the logger used by the scheduler instance.
|
||||
//
|
||||
// If unset, the default logger is used.
|
||||
@ -284,7 +302,7 @@ func (s *Scheduler) Shutdown() {
|
||||
|
||||
func (s *Scheduler) runHeartbeater() {
|
||||
defer s.wg.Done()
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
ticker := time.NewTicker(s.heartbeatInterval)
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
@ -317,7 +335,7 @@ func (s *Scheduler) beat() {
|
||||
entries = append(entries, e)
|
||||
}
|
||||
s.logger.Debugf("Writing entries %v", entries)
|
||||
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
|
||||
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil {
|
||||
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user