2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Merge pull request #5 from hibiken/api/handler

Change Background to take Handler interface
This commit is contained in:
Ken Hibino 2019-12-02 20:49:43 -08:00 committed by GitHub
commit 4afb3a2401
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 13 deletions

View File

@ -59,7 +59,7 @@ func main() {
Addr: "localhost:6379", Addr: "localhost:6379",
}) })
bg.Run(handler) bg.Run(asynq.HandlerFunc(handler))
} }
// if handler returns an error or panics, the task will be retried after some delay. // if handler returns an error or panics, the task will be retried after some delay.

View File

@ -31,14 +31,33 @@ func NewBackground(numWorkers int, opt *RedisOpt) *Background {
} }
} }
// TaskHandler handles a given task and reports any error. // A Handler processes a task.
type TaskHandler func(*Task) error //
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
type Handler interface {
ProcessTask(*Task) error
}
// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as a Handler. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(*Task) error
// ProcessTask calls fn(task)
func (fn HandlerFunc) ProcessTask(task *Task) error {
return fn(task)
}
// Run starts the background-task processing and blocks until // Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives // an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
func (bg *Background) Run(handler TaskHandler) { func (bg *Background) Run(handler Handler) {
bg.start(handler) bg.start(handler)
defer bg.stop() defer bg.stop()
@ -51,7 +70,7 @@ func (bg *Background) Run(handler TaskHandler) {
} }
// starts the background-task processing. // starts the background-task processing.
func (bg *Background) start(handler TaskHandler) { func (bg *Background) start(handler Handler) {
bg.mu.Lock() bg.mu.Lock()
defer bg.mu.Unlock() defer bg.mu.Unlock()
if bg.running { if bg.running {

View File

@ -27,7 +27,7 @@ func TestBackground(t *testing.T) {
return nil return nil
} }
bg.start(h) bg.start(HandlerFunc(h))
client.Process(&Task{ client.Process(&Task{
Type: "send_email", Type: "send_email",

View File

@ -9,7 +9,7 @@ import (
type processor struct { type processor struct {
rdb *rdb rdb *rdb
handler TaskHandler handler Handler
// timeout for blocking dequeue operation. // timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever // dequeue needs to timeout to avoid blocking forever
@ -24,7 +24,7 @@ type processor struct {
done chan struct{} done chan struct{}
} }
func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor {
return &processor{ return &processor{
rdb: rdb, rdb: rdb,
handler: handler, handler: handler,
@ -108,11 +108,11 @@ func (p *processor) restore() {
// perform calls the handler with the given task. // perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value, // If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error. // otherwise, it recovers from panic and returns an error.
func perform(handler TaskHandler, task *Task) (err error) { func perform(h Handler, task *Task) (err error) {
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
err = fmt.Errorf("panic: %v", x) err = fmt.Errorf("panic: %v", x)
} }
}() }()
return handler(task) return h.ProcessTask(task)
} }

View File

@ -50,7 +50,8 @@ func TestProcessorSuccess(t *testing.T) {
// instantiate a new processor // instantiate a new processor
var mu sync.Mutex var mu sync.Mutex
var processed []*Task var processed []*Task
h := func(task *Task) error { var h HandlerFunc
h = func(task *Task) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
processed = append(processed, task) processed = append(processed, task)
@ -133,7 +134,8 @@ func TestProcessorRetry(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// instantiate a new processor // instantiate a new processor
h := func(task *Task) error { var h HandlerFunc
h = func(task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
p := newProcessor(r, 10, h) p := newProcessor(r, 10, h)
@ -178,7 +180,7 @@ func TestProcessorRetry(t *testing.T) {
func TestPerform(t *testing.T) { func TestPerform(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
handler TaskHandler handler HandlerFunc
task *Task task *Task
wantErr bool wantErr bool
}{ }{