2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 23:02:18 +08:00

Change Handler interface to take context.Context

This commit is contained in:
Ken Hibino 2020-02-11 07:06:52 -08:00
parent 79a04e52a3
commit 1b41c721b4
5 changed files with 27 additions and 20 deletions

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
@ -141,18 +142,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
// If ProcessTask return a non-nil error or panics, the task // If ProcessTask return a non-nil error or panics, the task
// will be retried after delay. // will be retried after delay.
type Handler interface { type Handler interface {
ProcessTask(*Task) error ProcessTask(context.Context, *Task) error
} }
// The HandlerFunc type is an adapter to allow the use of // The HandlerFunc type is an adapter to allow the use of
// ordinary functions as a Handler. If f is a function // ordinary functions as a Handler. If f is a function
// with the appropriate signature, HandlerFunc(f) is a // with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f. // Handler that calls f.
type HandlerFunc func(*Task) error type HandlerFunc func(context.Context, *Task) error
// ProcessTask calls fn(task) // ProcessTask calls fn(ctx, task)
func (fn HandlerFunc) ProcessTask(task *Task) error { func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
return fn(task) return fn(ctx, task)
} }
// Run starts the background-task processing and blocks until // Run starts the background-task processing and blocks until

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -27,7 +28,7 @@ func TestBackground(t *testing.T) {
}) })
// no-op handler // no-op handler
h := func(task *Task) error { h := func(ctx context.Context, task *Task) error {
return nil return nil
} }

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"sync" "sync"
@ -37,7 +38,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count) wg.Add(count)
handler := func(t *Task) error { handler := func(ctx context.Context, t *Task) error {
wg.Done() wg.Done()
return nil return nil
} }
@ -82,7 +83,7 @@ func BenchmarkEndToEnd(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count * 2) wg.Add(count * 2)
handler := func(t *Task) error { handler := func(ctx context.Context, t *Task) error {
// randomly fail 1% of tasks // randomly fail 1% of tasks
if rand.Intn(100) == 1 { if rand.Intn(100) == 1 {
return fmt.Errorf(":(") return fmt.Errorf(":(")
@ -141,7 +142,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(highCount + defaultCount + lowCount) wg.Add(highCount + defaultCount + lowCount)
handler := func(t *Task) error { handler := func(ctx context.Context, t *Task) error {
wg.Done() wg.Done()
return nil return nil
} }

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"sort" "sort"
@ -73,7 +74,7 @@ func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRe
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
handler: HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }), handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
} }
} }
@ -160,8 +161,10 @@ func (p *processor) exec() {
resCh := make(chan error, 1) resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload) task := NewTask(msg.Type, msg.Payload)
// TODO: Set timeout if provided
ctx := context.Background()
go func() { go func() {
resCh <- perform(p.handler, task) resCh <- perform(ctx, task, p.handler)
}() }()
select { select {
@ -282,13 +285,13 @@ func (p *processor) queues() []string {
// perform calls the handler with the given task. // perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value, // If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error. // otherwise, it recovers from panic and returns an error.
func perform(h Handler, task *Task) (err error) { func perform(ctx context.Context, task *Task, h Handler) (err error) {
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
err = fmt.Errorf("panic: %v", x) err = fmt.Errorf("panic: %v", x)
} }
}() }()
return h.ProcessTask(task) return h.ProcessTask(ctx, task)
} }
// uniq dedupes elements and returns a slice of unique names of length l. // uniq dedupes elements and returns a slice of unique names of length l.

View File

@ -5,6 +5,7 @@
package asynq package asynq
import ( import (
"context"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
@ -59,7 +60,7 @@ func TestProcessorSuccess(t *testing.T) {
// instantiate a new processor // instantiate a new processor
var mu sync.Mutex var mu sync.Mutex
var processed []*Task var processed []*Task
handler := func(task *Task) error { handler := func(ctx context.Context, task *Task) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
processed = append(processed, task) processed = append(processed, task)
@ -146,7 +147,7 @@ func TestProcessorRetry(t *testing.T) {
delayFunc := func(n int, e error, t *Task) time.Duration { delayFunc := func(n int, e error, t *Task) time.Duration {
return tc.delay return tc.delay
} }
handler := func(task *Task) error { handler := func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false)
@ -264,7 +265,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
// instantiate a new processor // instantiate a new processor
var mu sync.Mutex var mu sync.Mutex
var processed []*Task var processed []*Task
handler := func(task *Task) error { handler := func(ctx context.Context, task *Task) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
processed = append(processed, task) processed = append(processed, task)
@ -303,7 +304,7 @@ func TestPerform(t *testing.T) {
}{ }{
{ {
desc: "handler returns nil", desc: "handler returns nil",
handler: func(t *Task) error { handler: func(ctx context.Context, t *Task) error {
return nil return nil
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
@ -311,7 +312,7 @@ func TestPerform(t *testing.T) {
}, },
{ {
desc: "handler returns error", desc: "handler returns error",
handler: func(t *Task) error { handler: func(ctx context.Context, t *Task) error {
return fmt.Errorf("something went wrong") return fmt.Errorf("something went wrong")
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
@ -319,7 +320,7 @@ func TestPerform(t *testing.T) {
}, },
{ {
desc: "handler panics", desc: "handler panics",
handler: func(t *Task) error { handler: func(ctx context.Context, t *Task) error {
panic("something went terribly wrong") panic("something went terribly wrong")
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}),
@ -328,7 +329,7 @@ func TestPerform(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
got := perform(tc.handler, tc.task) got := perform(context.Background(), tc.task, tc.handler)
if !tc.wantErr && got != nil { if !tc.wantErr && got != nil {
t.Errorf("%s: perform() = %v, want nil", tc.desc, got) t.Errorf("%s: perform() = %v, want nil", tc.desc, got)
continue continue