2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Call error handler when task was not processed successfully

This commit is contained in:
Ken Hibino 2020-02-29 16:27:59 -08:00
parent 95b7dcaad4
commit a4e4c0b1d5
3 changed files with 40 additions and 17 deletions

View File

@ -165,7 +165,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
syncer := newSyncer(syncCh, 5*time.Second) syncer := newSyncer(syncCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, ps, 5*time.Second) heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues) scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels) processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(rdb, cancels) subscriber := newSubscriber(rdb, cancels)
return &Background{ return &Background{
rdb: rdb, rdb: rdb,

View File

@ -31,6 +31,8 @@ type processor struct {
retryDelayFunc retryDelayFunc retryDelayFunc retryDelayFunc
errHandler ErrorHandler
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
@ -59,7 +61,8 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor { func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
info := ps.Get() info := ps.Get()
qcfg := normalizeQueueCfg(info.Queues) qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil) orderedQueues := []string(nil)
@ -79,6 +82,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh c
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
errHandler: errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
} }
} }
@ -192,6 +196,9 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry // 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead // 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil { if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
}
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry {
p.kill(msg, resErr) p.kill(msg, resErr)
} else { } else {

View File

@ -68,7 +68,7 @@ func TestProcessorSuccess(t *testing.T) {
} }
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -123,14 +123,19 @@ func TestProcessorRetry(t *testing.T) {
enqueued []*base.TaskMessage // initial default queue state enqueued []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run incoming []*base.TaskMessage // tasks to be enqueued during run
delay time.Duration // retry delay duration delay time.Duration // retry delay duration
handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []h.ZSetEntry // tasks in retry queue at the end wantRetry []h.ZSetEntry // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end wantDead []*base.TaskMessage // tasks in dead queue at the end
wantErrCount int // number of times error handler should be called
}{ }{
{ {
enqueued: []*base.TaskMessage{m1, m2}, enqueued: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4}, incoming: []*base.TaskMessage{m3, m4},
delay: time.Minute, delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg)
}),
wait: time.Second, wait: time.Second,
wantRetry: []h.ZSetEntry{ wantRetry: []h.ZSetEntry{
{Msg: &r2, Score: float64(now.Add(time.Minute).Unix())}, {Msg: &r2, Score: float64(now.Add(time.Minute).Unix())},
@ -138,6 +143,7 @@ func TestProcessorRetry(t *testing.T) {
{Msg: &r4, Score: float64(now.Add(time.Minute).Unix())}, {Msg: &r4, Score: float64(now.Add(time.Minute).Unix())},
}, },
wantDead: []*base.TaskMessage{&r1}, wantDead: []*base.TaskMessage{&r1},
wantErrCount: 4,
}, },
} }
@ -149,13 +155,19 @@ func TestProcessorRetry(t *testing.T) {
delayFunc := func(n int, e error, t *Task) time.Duration { delayFunc := func(n int, e error, t *Task) time.Duration {
return tc.delay return tc.delay
} }
handler := func(ctx context.Context, task *Task) error { var (
return fmt.Errorf(errMsg) mu sync.Mutex // guards n
n int // number of times error handler is called
)
errHandler := func(t *Task, err error, retried, maxRetry int) {
mu.Lock()
defer mu.Unlock()
n++
} }
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
p.handler = HandlerFunc(handler) p.handler = tc.handler
var wg sync.WaitGroup var wg sync.WaitGroup
p.start(&wg) p.start(&wg)
@ -183,6 +195,10 @@ func TestProcessorRetry(t *testing.T) {
if l := r.LLen(base.InProgressQueue).Val(); l != 0 { if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
} }
if n != tc.wantErrCount {
t.Errorf("error handler was called %d, want %d", n, tc.wantErrCount)
}
} }
} }
@ -216,7 +232,7 @@ func TestProcessorQueues(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false) ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations) p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil)
got := p.queues() got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -284,7 +300,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup var wg sync.WaitGroup