From b0a54cd2b2962c4be0d5cd677ce61e5a4256b097 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 2 Dec 2019 20:42:21 -0800 Subject: [PATCH] Change Background API to take Handler interface --- README.md | 2 +- background.go | 27 +++++++++++++++++++++++---- background_test.go | 2 +- processor.go | 8 ++++---- processor_test.go | 8 +++++--- 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 7ffa578..dee942f 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,7 @@ func main() { Addr: "localhost:6379", }) - bg.Run(handler) + bg.Run(asynq.HandlerFunc(handler)) } // if handler returns an error or panics, the task will be retried after some delay. diff --git a/background.go b/background.go index bddc2e8..3a0d5d8 100644 --- a/background.go +++ b/background.go @@ -31,14 +31,33 @@ func NewBackground(numWorkers int, opt *RedisOpt) *Background { } } -// TaskHandler handles a given task and reports any error. -type TaskHandler func(*Task) error +// A Handler processes a task. +// +// ProcessTask should return nil if the processing of a task +// is successful. +// +// If ProcessTask return a non-nil error or panics, the task +// will be retried after delay. +type Handler interface { + ProcessTask(*Task) error +} + +// The HandlerFunc type is an adapter to allow the use of +// ordinary functions as a Handler. If f is a function +// with the appropriate signature, HandlerFunc(f) is a +// Handler that calls f. +type HandlerFunc func(*Task) error + +// ProcessTask calls fn(task) +func (fn HandlerFunc) ProcessTask(task *Task) error { + return fn(task) +} // Run starts the background-task processing and blocks until // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. -func (bg *Background) Run(handler TaskHandler) { +func (bg *Background) Run(handler Handler) { bg.start(handler) defer bg.stop() @@ -51,7 +70,7 @@ func (bg *Background) Run(handler TaskHandler) { } // starts the background-task processing. -func (bg *Background) start(handler TaskHandler) { +func (bg *Background) start(handler Handler) { bg.mu.Lock() defer bg.mu.Unlock() if bg.running { diff --git a/background_test.go b/background_test.go index e59611c..1dec050 100644 --- a/background_test.go +++ b/background_test.go @@ -27,7 +27,7 @@ func TestBackground(t *testing.T) { return nil } - bg.start(h) + bg.start(HandlerFunc(h)) client.Process(&Task{ Type: "send_email", diff --git a/processor.go b/processor.go index f09bb62..ab12bd6 100644 --- a/processor.go +++ b/processor.go @@ -9,7 +9,7 @@ import ( type processor struct { rdb *rdb - handler TaskHandler + handler Handler // timeout for blocking dequeue operation. // dequeue needs to timeout to avoid blocking forever @@ -24,7 +24,7 @@ type processor struct { done chan struct{} } -func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { +func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor { return &processor{ rdb: rdb, handler: handler, @@ -108,11 +108,11 @@ func (p *processor) restore() { // perform calls the handler with the given task. // If the call returns without panic, it simply returns the value, // otherwise, it recovers from panic and returns an error. -func perform(handler TaskHandler, task *Task) (err error) { +func perform(h Handler, task *Task) (err error) { defer func() { if x := recover(); x != nil { err = fmt.Errorf("panic: %v", x) } }() - return handler(task) + return h.ProcessTask(task) } diff --git a/processor_test.go b/processor_test.go index 6d9a14d..cfe9d5d 100644 --- a/processor_test.go +++ b/processor_test.go @@ -50,7 +50,8 @@ func TestProcessorSuccess(t *testing.T) { // instantiate a new processor var mu sync.Mutex var processed []*Task - h := func(task *Task) error { + var h HandlerFunc + h = func(task *Task) error { mu.Lock() defer mu.Unlock() processed = append(processed, task) @@ -133,7 +134,8 @@ func TestProcessorRetry(t *testing.T) { t.Fatal(err) } // instantiate a new processor - h := func(task *Task) error { + var h HandlerFunc + h = func(task *Task) error { return fmt.Errorf(errMsg) } p := newProcessor(r, 10, h) @@ -178,7 +180,7 @@ func TestProcessorRetry(t *testing.T) { func TestPerform(t *testing.T) { tests := []struct { desc string - handler TaskHandler + handler HandlerFunc task *Task wantErr bool }{