2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 10:56:12 +08:00

Call error handler when task was not processed successfully

This commit is contained in:
Ken Hibino
2020-02-29 16:27:59 -08:00
parent 8a9efc0d25
commit 63695cfb47
3 changed files with 40 additions and 17 deletions

View File

@@ -165,7 +165,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
syncer := newSyncer(syncCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels)
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(rdb, cancels)
return &Background{
rdb: rdb,

View File

@@ -31,6 +31,8 @@ type processor struct {
retryDelayFunc retryDelayFunc
errHandler ErrorHandler
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
@@ -59,7 +61,8 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor {
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
info := ps.Get()
qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil)
@@ -79,6 +82,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh c
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
errHandler: errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
}
}
@@ -192,6 +196,9 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
}
if msg.Retried >= msg.Retry {
p.kill(msg, resErr)
} else {

View File

@@ -68,7 +68,7 @@ func TestProcessorSuccess(t *testing.T) {
}
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler)
var wg sync.WaitGroup
@@ -123,14 +123,19 @@ func TestProcessorRetry(t *testing.T) {
enqueued []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run
delay time.Duration // retry delay duration
handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []h.ZSetEntry // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end
wantErrCount int // number of times error handler should be called
}{
{
enqueued: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg)
}),
wait: time.Second,
wantRetry: []h.ZSetEntry{
{Msg: &r2, Score: float64(now.Add(time.Minute).Unix())},
@@ -138,6 +143,7 @@ func TestProcessorRetry(t *testing.T) {
{Msg: &r4, Score: float64(now.Add(time.Minute).Unix())},
},
wantDead: []*base.TaskMessage{&r1},
wantErrCount: 4,
},
}
@@ -149,13 +155,19 @@ func TestProcessorRetry(t *testing.T) {
delayFunc := func(n int, e error, t *Task) time.Duration {
return tc.delay
}
handler := func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg)
var (
mu sync.Mutex // guards n
n int // number of times error handler is called
)
errHandler := func(t *Task, err error, retried, maxRetry int) {
mu.Lock()
defer mu.Unlock()
n++
}
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations)
p.handler = HandlerFunc(handler)
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
p.handler = tc.handler
var wg sync.WaitGroup
p.start(&wg)
@@ -183,6 +195,10 @@ func TestProcessorRetry(t *testing.T) {
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
}
if n != tc.wantErrCount {
t.Errorf("error handler was called %d, want %d", n, tc.wantErrCount)
}
}
}
@@ -216,7 +232,7 @@ func TestProcessorQueues(t *testing.T) {
for _, tc := range tests {
cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations)
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@@ -284,7 +300,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler)
var wg sync.WaitGroup