2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00
asynq/scheduler.go

369 lines
10 KiB
Go
Raw Permalink Normal View History

2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
2019-12-29 05:33:24 +08:00
package asynq
import (
"fmt"
"os"
2020-02-16 15:14:30 +08:00
"sync"
2019-12-29 05:33:24 +08:00
"time"
2020-04-18 22:55:10 +08:00
"github.com/google/uuid"
2020-04-18 22:55:10 +08:00
"github.com/hibiken/asynq/internal/base"
2020-05-06 13:10:11 +08:00
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
2023-09-19 16:16:51 +08:00
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"
2019-12-29 05:33:24 +08:00
)
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
2021-08-17 11:22:43 +08:00
//
// Schedulers are safe for concurrent use by multiple goroutines.
type Scheduler struct {
id string
state *serverState
heartbeatInterval time.Duration
logger *log.Logger
client *Client
rdb *rdb.RDB
cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)
2021-08-17 11:22:43 +08:00
// guards idmap
mu sync.Mutex
2020-11-08 02:41:27 +08:00
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
2023-09-19 16:16:51 +08:00
// When a Scheduler has been created with an existing Redis connection, we do
// not want to close it.
sharedConnection bool
}
const defaultHeartbeatInterval = 10 * time.Second
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
2023-09-19 16:16:51 +08:00
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
2021-01-29 22:37:35 +08:00
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
2023-09-19 16:16:51 +08:00
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
scheduler.sharedConnection = false
return scheduler
}
// NewSchedulerFromRedisClient returns a new instance of Scheduler given a redis.UniversalClient
2023-09-19 16:16:51 +08:00
// The parameter opts is optional, defaults will be used if opts is set to nil.
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
2023-09-19 16:16:51 +08:00
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
if opts == nil {
opts = &SchedulerOpts{}
}
heartbeatInterval := opts.HeartbeatInterval
if heartbeatInterval <= 0 {
heartbeatInterval = defaultHeartbeatInterval
}
logger := log.NewLogger(opts.Logger)
loglevel := opts.LogLevel
if loglevel == level_unspecified {
loglevel = InfoLevel
}
logger.SetLevel(toInternalLogLevel(loglevel))
loc := opts.Location
if loc == nil {
loc = time.UTC
}
return &Scheduler{
id: generateSchedulerID(),
state: &serverState{value: srvStateNew},
heartbeatInterval: heartbeatInterval,
logger: logger,
client: NewClientFromRedisClient(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
preEnqueueFunc: opts.PreEnqueueFunc,
postEnqueueFunc: opts.PostEnqueueFunc,
errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
}
}
func generateSchedulerID() string {
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
return fmt.Sprintf("%s:%d:%v", host, os.Getpid(), uuid.New())
}
2019-12-29 05:33:24 +08:00
// SchedulerOpts specifies scheduler options.
type SchedulerOpts struct {
// HeartbeatInterval specifies the interval between scheduler heartbeats.
//
// If unset, zero or a negative value, the interval is set to 10 second.
//
// Note: Setting this value too low may add significant load to redis.
//
// By default, HeartbeatInterval is set to 10 seconds.
HeartbeatInterval time.Duration
// Logger specifies the logger used by the scheduler instance.
//
// If unset, the default logger is used.
Logger Logger
2019-12-29 05:33:24 +08:00
// LogLevel specifies the minimum log level to enable.
//
// If unset, InfoLevel is used by default.
LogLevel LogLevel
// Location specifies the time zone location.
//
// If unset, the UTC time zone (time.UTC) is used.
Location *time.Location
// PreEnqueueFunc, if provided, is called before a task gets enqueued by Scheduler.
// The callback function should return quickly to not block the current thread.
PreEnqueueFunc func(task *Task, opts []Option)
// PostEnqueueFunc, if provided, is called after a task gets enqueued by Scheduler.
// The callback function should return quickly to not block the current thread.
PostEnqueueFunc func(info *TaskInfo, err error)
// Deprecated: Use PostEnqueueFunc instead
// EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
// due to an error.
EnqueueErrorHandler func(task *Task, opts []Option, err error)
2019-12-29 05:33:24 +08:00
}
2022-05-17 12:14:15 +08:00
// enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
cronspec string
task *Task
opts []Option
location *time.Location
logger *log.Logger
client *Client
rdb *rdb.RDB
preEnqueueFunc func(task *Task, opts []Option)
postEnqueueFunc func(info *TaskInfo, err error)
errHandler func(task *Task, opts []Option, err error)
}
func (j *enqueueJob) Run() {
if j.preEnqueueFunc != nil {
j.preEnqueueFunc(j.task, j.opts)
}
info, err := j.client.Enqueue(j.task, j.opts...)
if j.postEnqueueFunc != nil {
j.postEnqueueFunc(info, err)
}
if err != nil {
if j.errHandler != nil {
j.errHandler(j.task, j.opts, err)
}
return
}
j.logger.Debugf("scheduler enqueued a task: %+v", info)
event := &base.SchedulerEnqueueEvent{
TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
if err != nil {
2022-06-05 03:48:56 +08:00
j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err)
}
}
2020-10-12 21:47:43 +08:00
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
cronspec: cronspec,
task: task,
opts: opts,
location: s.location,
client: s.client,
rdb: s.rdb,
logger: s.logger,
preEnqueueFunc: s.preEnqueueFunc,
postEnqueueFunc: s.postEnqueueFunc,
errHandler: s.errHandler,
}
2020-11-08 02:41:27 +08:00
cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err
}
2021-08-17 11:22:43 +08:00
s.mu.Lock()
2020-11-08 02:41:27 +08:00
s.idmap[job.id.String()] = cronID
2021-08-17 11:22:43 +08:00
s.mu.Unlock()
return job.id.String(), nil
}
2020-11-08 02:41:27 +08:00
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
2021-08-17 11:22:43 +08:00
s.mu.Lock()
defer s.mu.Unlock()
2020-11-08 02:41:27 +08:00
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
2021-08-17 11:22:43 +08:00
delete(s.idmap, entryID)
2020-11-08 02:41:27 +08:00
s.cron.Remove(cronID)
return nil
}
// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been shutdown.
func (s *Scheduler) Run() error {
if err := s.Start(); err != nil {
return err
}
s.waitForSignals()
s.Shutdown()
return nil
}
// Start starts the scheduler.
// It returns an error if the scheduler is already running or has been shutdown.
func (s *Scheduler) Start() error {
if err := s.start(); err != nil {
return err
2019-12-29 05:33:24 +08:00
}
s.logger.Info("Scheduler starting")
s.logger.Infof("Scheduler timezone is set to %v", s.location)
s.cron.Start()
s.wg.Add(1)
go s.runHeartbeater()
return nil
}
// Checks server state and returns an error if pre-condition is not met.
// Otherwise it sets the server state to active.
func (s *Scheduler) start() error {
s.state.mu.Lock()
defer s.state.mu.Unlock()
switch s.state.value {
case srvStateActive:
return fmt.Errorf("asynq: the scheduler is already running")
case srvStateClosed:
return fmt.Errorf("asynq: the scheduler has already been stopped")
}
s.state.value = srvStateActive
return nil
2019-12-29 05:33:24 +08:00
}
// Shutdown stops and shuts down the scheduler.
func (s *Scheduler) Shutdown() {
s.state.mu.Lock()
if s.state.value == srvStateNew || s.state.value == srvStateClosed {
// scheduler is not running, do nothing and return.
s.state.mu.Unlock()
return
}
s.state.value = srvStateClosed
s.state.mu.Unlock()
s.logger.Info("Scheduler shutting down")
close(s.done) // signal heartbeater to stop
ctx := s.cron.Stop()
<-ctx.Done()
s.wg.Wait()
s.clearHistory()
if err := s.client.Close(); err != nil {
s.logger.Errorf("Failed to close redis client connection: %v", err)
}
2023-09-19 16:16:51 +08:00
if !s.sharedConnection {
s.rdb.Close()
}
s.logger.Info("Scheduler stopped")
2019-12-29 05:33:24 +08:00
}
func (s *Scheduler) runHeartbeater() {
defer s.wg.Done()
ticker := time.NewTicker(s.heartbeatInterval)
for {
select {
case <-s.done:
s.logger.Debugf("Scheduler heatbeater shutting down")
if err := s.rdb.ClearSchedulerEntries(s.id); err != nil {
s.logger.Errorf("Failed to clear the scheduler entries: %v", err)
}
2022-06-27 04:10:06 +08:00
ticker.Stop()
return
case <-ticker.C:
s.beat()
2019-12-29 05:33:24 +08:00
}
}
2019-12-29 05:33:24 +08:00
}
// beat writes a snapshot of entries to redis.
func (s *Scheduler) beat() {
var entries []*base.SchedulerEntry
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
2021-03-21 04:42:13 +08:00
Type: job.task.Type(),
Payload: job.task.Payload(),
Opts: stringifyOptions(job.opts),
Next: entry.Next,
Prev: entry.Prev,
}
entries = append(entries, e)
}
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil {
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}
}
func stringifyOptions(opts []Option) []string {
var res []string
for _, opt := range opts {
res = append(res, opt.String())
2019-12-29 05:33:24 +08:00
}
return res
2019-12-29 05:33:24 +08:00
}
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
}
}
}
// Ping performs a ping against the redis connection.
func (s *Scheduler) Ping() error {
s.state.mu.Lock()
defer s.state.mu.Unlock()
if s.state.value == srvStateClosed {
return nil
}
return s.rdb.Ping()
}