From 1b41c721b4ce6a7a294b51bc4d4db0660f1a62c4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 11 Feb 2020 07:06:52 -0800 Subject: [PATCH] Change Handler interface to take context.Context --- background.go | 11 ++++++----- background_test.go | 3 ++- benchmark_test.go | 7 ++++--- processor.go | 11 +++++++---- processor_test.go | 15 ++++++++------- 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/background.go b/background.go index 273be7d..48696b7 100644 --- a/background.go +++ b/background.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "fmt" "math" "math/rand" @@ -141,18 +142,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { // If ProcessTask return a non-nil error or panics, the task // will be retried after delay. type Handler interface { - ProcessTask(*Task) error + ProcessTask(context.Context, *Task) error } // The HandlerFunc type is an adapter to allow the use of // ordinary functions as a Handler. If f is a function // with the appropriate signature, HandlerFunc(f) is a // Handler that calls f. -type HandlerFunc func(*Task) error +type HandlerFunc func(context.Context, *Task) error -// ProcessTask calls fn(task) -func (fn HandlerFunc) ProcessTask(task *Task) error { - return fn(task) +// ProcessTask calls fn(ctx, task) +func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { + return fn(ctx, task) } // Run starts the background-task processing and blocks until diff --git a/background_test.go b/background_test.go index 2acebbc..0b6f5ec 100644 --- a/background_test.go +++ b/background_test.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "testing" "time" @@ -27,7 +28,7 @@ func TestBackground(t *testing.T) { }) // no-op handler - h := func(task *Task) error { + h := func(ctx context.Context, task *Task) error { return nil } diff --git a/benchmark_test.go b/benchmark_test.go index 580bab7..eb5fa22 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "fmt" "math/rand" "sync" @@ -37,7 +38,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { var wg sync.WaitGroup wg.Add(count) - handler := func(t *Task) error { + handler := func(ctx context.Context, t *Task) error { wg.Done() return nil } @@ -82,7 +83,7 @@ func BenchmarkEndToEnd(b *testing.B) { var wg sync.WaitGroup wg.Add(count * 2) - handler := func(t *Task) error { + handler := func(ctx context.Context, t *Task) error { // randomly fail 1% of tasks if rand.Intn(100) == 1 { return fmt.Errorf(":(") @@ -141,7 +142,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { var wg sync.WaitGroup wg.Add(highCount + defaultCount + lowCount) - handler := func(t *Task) error { + handler := func(ctx context.Context, t *Task) error { wg.Done() return nil } diff --git a/processor.go b/processor.go index 738128c..bf6ece4 100644 --- a/processor.go +++ b/processor.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "fmt" "math/rand" "sort" @@ -73,7 +74,7 @@ func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRe done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), - handler: HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }), + handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), } } @@ -160,8 +161,10 @@ func (p *processor) exec() { resCh := make(chan error, 1) task := NewTask(msg.Type, msg.Payload) + // TODO: Set timeout if provided + ctx := context.Background() go func() { - resCh <- perform(p.handler, task) + resCh <- perform(ctx, task, p.handler) }() select { @@ -282,13 +285,13 @@ func (p *processor) queues() []string { // perform calls the handler with the given task. // If the call returns without panic, it simply returns the value, // otherwise, it recovers from panic and returns an error. -func perform(h Handler, task *Task) (err error) { +func perform(ctx context.Context, task *Task, h Handler) (err error) { defer func() { if x := recover(); x != nil { err = fmt.Errorf("panic: %v", x) } }() - return h.ProcessTask(task) + return h.ProcessTask(ctx, task) } // uniq dedupes elements and returns a slice of unique names of length l. diff --git a/processor_test.go b/processor_test.go index 893a9f3..25b0abe 100644 --- a/processor_test.go +++ b/processor_test.go @@ -5,6 +5,7 @@ package asynq import ( + "context" "fmt" "sort" "sync" @@ -59,7 +60,7 @@ func TestProcessorSuccess(t *testing.T) { // instantiate a new processor var mu sync.Mutex var processed []*Task - handler := func(task *Task) error { + handler := func(ctx context.Context, task *Task) error { mu.Lock() defer mu.Unlock() processed = append(processed, task) @@ -146,7 +147,7 @@ func TestProcessorRetry(t *testing.T) { delayFunc := func(n int, e error, t *Task) time.Duration { return tc.delay } - handler := func(task *Task) error { + handler := func(ctx context.Context, task *Task) error { return fmt.Errorf(errMsg) } pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) @@ -264,7 +265,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { // instantiate a new processor var mu sync.Mutex var processed []*Task - handler := func(task *Task) error { + handler := func(ctx context.Context, task *Task) error { mu.Lock() defer mu.Unlock() processed = append(processed, task) @@ -303,7 +304,7 @@ func TestPerform(t *testing.T) { }{ { desc: "handler returns nil", - handler: func(t *Task) error { + handler: func(ctx context.Context, t *Task) error { return nil }, task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), @@ -311,7 +312,7 @@ func TestPerform(t *testing.T) { }, { desc: "handler returns error", - handler: func(t *Task) error { + handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), @@ -319,7 +320,7 @@ func TestPerform(t *testing.T) { }, { desc: "handler panics", - handler: func(t *Task) error { + handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), @@ -328,7 +329,7 @@ func TestPerform(t *testing.T) { } for _, tc := range tests { - got := perform(tc.handler, tc.task) + got := perform(context.Background(), tc.task, tc.handler) if !tc.wantErr && got != nil { t.Errorf("%s: perform() = %v, want nil", tc.desc, got) continue