From 96b231830040783da57f8e769e77e56cb6e2c1fd Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 11 Oct 2020 10:06:28 -0700 Subject: [PATCH] Add EnqueueErrorHandler option to SchedulerOpts --- internal/rdb/inspect_test.go | 4 +- scheduler.go | 77 ++++++++++++++++++++---------------- scheduler_test.go | 40 +++++++++++++++++++ 3 files changed, 85 insertions(+), 36 deletions(-) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 1f6ba09..80d3b0e 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2993,7 +2993,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) { Spec: "* * * * *", Type: "foo", Payload: nil, - Opts: "", + Opts: nil, Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, @@ -3001,7 +3001,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) { Spec: "@every 20m", Type: "bar", Payload: map[string]interface{}{"fiz": "baz"}, - Opts: "", + Opts: nil, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), }, diff --git a/scheduler.go b/scheduler.go index 55b2c0a..0cd5dea 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,15 +19,16 @@ import ( // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. type Scheduler struct { - id string - status *base.ServerStatus - logger *log.Logger - client *Client - rdb *rdb.RDB - cron *cron.Cron - location *time.Location - done chan struct{} - wg sync.WaitGroup + id string + status *base.ServerStatus + logger *log.Logger + client *Client + rdb *rdb.RDB + cron *cron.Cron + location *time.Location + done chan struct{} + wg sync.WaitGroup + errHandler func(task *Task, opts []Option, err error) } // NewScheduler returns a new Scheduler instance given the redis connection option. @@ -50,14 +51,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { } return &Scheduler{ - id: generateSchedulerID(), - status: base.NewServerStatus(base.StatusIdle), - logger: logger, - client: NewClient(r), - rdb: rdb.NewRDB(createRedisClient(r)), - cron: cron.New(cron.WithLocation(loc)), - location: loc, - done: make(chan struct{}), + id: generateSchedulerID(), + status: base.NewServerStatus(base.StatusIdle), + logger: logger, + client: NewClient(r), + rdb: rdb.NewRDB(createRedisClient(r)), + cron: cron.New(cron.WithLocation(loc)), + location: loc, + done: make(chan struct{}), + errHandler: opts.EnqueueErrorHandler, } } @@ -86,25 +88,31 @@ type SchedulerOpts struct { // If unset, the UTC time zone (time.UTC) is used. Location *time.Location - // TODO: Add ErrorHandler + // EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task + // due to an error. + EnqueueErrorHandler func(task *Task, opts []Option, err error) } // enqueueJob encapsulates the job of enqueing a task and recording the event. type enqueueJob struct { - id uuid.UUID - cronspec string - task *Task - opts []Option - location *time.Location - logger *log.Logger - client *Client - rdb *rdb.RDB + id uuid.UUID + cronspec string + task *Task + opts []Option + location *time.Location + logger *log.Logger + client *Client + rdb *rdb.RDB + errHandler func(task *Task, opts []Option, err error) } func (j *enqueueJob) Run() { res, err := j.client.Enqueue(j.task, j.opts...) if err != nil { j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err) + if j.errHandler != nil { + j.errHandler(j.task, j.opts, err) + } return } j.logger.Infof("scheduler enqueued a task: %+v", res) @@ -122,14 +130,15 @@ func (j *enqueueJob) Run() { // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { job := &enqueueJob{ - id: uuid.New(), - cronspec: cronspec, - task: task, - opts: opts, - location: s.location, - client: s.client, - rdb: s.rdb, - logger: s.logger, + id: uuid.New(), + cronspec: cronspec, + task: task, + opts: opts, + location: s.location, + client: s.client, + rdb: s.rdb, + logger: s.logger, + errHandler: s.errHandler, } if _, err = s.cron.AddJob(cronspec, job); err != nil { return "", err diff --git a/scheduler_test.go b/scheduler_test.go index 4fe4c80..47905af 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -5,6 +5,7 @@ package asynq import ( + "sync" "testing" "time" @@ -76,3 +77,42 @@ func TestScheduler(t *testing.T) { } } } + +func TestSchedulerWhenRedisDown(t *testing.T) { + var ( + mu sync.Mutex + counter int + ) + errorHandler := func(task *Task, opts []Option, err error) { + mu.Lock() + counter++ + mu.Unlock() + } + + // Connect to non-existent redis instance to simulate a redis server being down. + scheduler := NewScheduler( + RedisClientOpt{Addr: ":9876"}, + &SchedulerOpts{EnqueueErrorHandler: errorHandler}, + ) + + task := NewTask("test", nil) + + if _, err := scheduler.Register("@every 3s", task); err != nil { + t.Fatal(err) + } + + if err := scheduler.Start(); err != nil { + t.Fatal(err) + } + // Scheduler should attempt to enqueue the task three times (every 3s). + time.Sleep(10 * time.Second) + if err := scheduler.Stop(); err != nil { + t.Fatal(err) + } + + mu.Lock() + if counter != 3 { + t.Errorf("EnqueueErrorHandler was called %d times, want 3", counter) + } + mu.Unlock() +}