2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 06:42:16 +08:00

Change Background API to take Handler interface

This commit is contained in:
Ken Hibino 2019-12-02 20:42:21 -08:00
parent 1a996e0d40
commit b0a54cd2b2
5 changed files with 34 additions and 13 deletions

View File

@ -59,7 +59,7 @@ func main() {
Addr: "localhost:6379",
})
bg.Run(handler)
bg.Run(asynq.HandlerFunc(handler))
}
// if handler returns an error or panics, the task will be retried after some delay.

View File

@ -31,14 +31,33 @@ func NewBackground(numWorkers int, opt *RedisOpt) *Background {
}
}
// TaskHandler handles a given task and reports any error.
type TaskHandler func(*Task) error
// A Handler processes a task.
//
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
type Handler interface {
ProcessTask(*Task) error
}
// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as a Handler. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(*Task) error
// ProcessTask calls fn(task)
func (fn HandlerFunc) ProcessTask(task *Task) error {
return fn(task)
}
// Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks.
func (bg *Background) Run(handler TaskHandler) {
func (bg *Background) Run(handler Handler) {
bg.start(handler)
defer bg.stop()
@ -51,7 +70,7 @@ func (bg *Background) Run(handler TaskHandler) {
}
// starts the background-task processing.
func (bg *Background) start(handler TaskHandler) {
func (bg *Background) start(handler Handler) {
bg.mu.Lock()
defer bg.mu.Unlock()
if bg.running {

View File

@ -27,7 +27,7 @@ func TestBackground(t *testing.T) {
return nil
}
bg.start(h)
bg.start(HandlerFunc(h))
client.Process(&Task{
Type: "send_email",

View File

@ -9,7 +9,7 @@ import (
type processor struct {
rdb *rdb
handler TaskHandler
handler Handler
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
@ -24,7 +24,7 @@ type processor struct {
done chan struct{}
}
func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor {
func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor {
return &processor{
rdb: rdb,
handler: handler,
@ -108,11 +108,11 @@ func (p *processor) restore() {
// perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error.
func perform(handler TaskHandler, task *Task) (err error) {
func perform(h Handler, task *Task) (err error) {
defer func() {
if x := recover(); x != nil {
err = fmt.Errorf("panic: %v", x)
}
}()
return handler(task)
return h.ProcessTask(task)
}

View File

@ -50,7 +50,8 @@ func TestProcessorSuccess(t *testing.T) {
// instantiate a new processor
var mu sync.Mutex
var processed []*Task
h := func(task *Task) error {
var h HandlerFunc
h = func(task *Task) error {
mu.Lock()
defer mu.Unlock()
processed = append(processed, task)
@ -133,7 +134,8 @@ func TestProcessorRetry(t *testing.T) {
t.Fatal(err)
}
// instantiate a new processor
h := func(task *Task) error {
var h HandlerFunc
h = func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(r, 10, h)
@ -178,7 +180,7 @@ func TestProcessorRetry(t *testing.T) {
func TestPerform(t *testing.T) {
tests := []struct {
desc string
handler TaskHandler
handler HandlerFunc
task *Task
wantErr bool
}{