2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Add EnqueueErrorHandler option to SchedulerOpts

This commit is contained in:
Ken Hibino 2020-10-11 10:06:28 -07:00
parent 8312515e64
commit 96b2318300
3 changed files with 85 additions and 36 deletions

View File

@ -2993,7 +2993,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
Spec: "* * * * *",
Type: "foo",
Payload: nil,
Opts: "",
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
@ -3001,7 +3001,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
Spec: "@every 20m",
Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"},
Opts: "",
Opts: nil,
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},

View File

@ -28,6 +28,7 @@ type Scheduler struct {
location *time.Location
done chan struct{}
wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error)
}
// NewScheduler returns a new Scheduler instance given the redis connection option.
@ -58,6 +59,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler,
}
}
@ -86,7 +88,9 @@ type SchedulerOpts struct {
// If unset, the UTC time zone (time.UTC) is used.
Location *time.Location
// TODO: Add ErrorHandler
// EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
// due to an error.
EnqueueErrorHandler func(task *Task, opts []Option, err error)
}
// enqueueJob encapsulates the job of enqueing a task and recording the event.
@ -99,12 +103,16 @@ type enqueueJob struct {
logger *log.Logger
client *Client
rdb *rdb.RDB
errHandler func(task *Task, opts []Option, err error)
}
func (j *enqueueJob) Run() {
res, err := j.client.Enqueue(j.task, j.opts...)
if err != nil {
j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err)
if j.errHandler != nil {
j.errHandler(j.task, j.opts, err)
}
return
}
j.logger.Infof("scheduler enqueued a task: %+v", res)
@ -130,6 +138,7 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
client: s.client,
rdb: s.rdb,
logger: s.logger,
errHandler: s.errHandler,
}
if _, err = s.cron.AddJob(cronspec, job); err != nil {
return "", err

View File

@ -5,6 +5,7 @@
package asynq
import (
"sync"
"testing"
"time"
@ -76,3 +77,42 @@ func TestScheduler(t *testing.T) {
}
}
}
func TestSchedulerWhenRedisDown(t *testing.T) {
var (
mu sync.Mutex
counter int
)
errorHandler := func(task *Task, opts []Option, err error) {
mu.Lock()
counter++
mu.Unlock()
}
// Connect to non-existent redis instance to simulate a redis server being down.
scheduler := NewScheduler(
RedisClientOpt{Addr: ":9876"},
&SchedulerOpts{EnqueueErrorHandler: errorHandler},
)
task := NewTask("test", nil)
if _, err := scheduler.Register("@every 3s", task); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
// Scheduler should attempt to enqueue the task three times (every 3s).
time.Sleep(10 * time.Second)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}
mu.Lock()
if counter != 3 {
t.Errorf("EnqueueErrorHandler was called %d times, want 3", counter)
}
mu.Unlock()
}