From a4e4c0b1d5c028192e8301e22640aa25b1857e52 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 29 Feb 2020 16:27:59 -0800 Subject: [PATCH] Call error handler when task was not processed successfully --- background.go | 2 +- processor.go | 9 ++++++++- processor_test.go | 46 +++++++++++++++++++++++++++++++--------------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/background.go b/background.go index d4962c3..275fbaa 100644 --- a/background.go +++ b/background.go @@ -165,7 +165,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { syncer := newSyncer(syncCh, 5*time.Second) heartbeater := newHeartbeater(rdb, ps, 5*time.Second) scheduler := newScheduler(rdb, 5*time.Second, queues) - processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels) + processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) subscriber := newSubscriber(rdb, cancels) return &Background{ rdb: rdb, diff --git a/processor.go b/processor.go index 0852461..fa39424 100644 --- a/processor.go +++ b/processor.go @@ -31,6 +31,8 @@ type processor struct { retryDelayFunc retryDelayFunc + errHandler ErrorHandler + // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest @@ -59,7 +61,8 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor { +func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, + syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { info := ps.Get() qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) @@ -79,6 +82,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh c done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), + errHandler: errHandler, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), } } @@ -192,6 +196,9 @@ func (p *processor) exec() { // 2) Retry -> Removes the message from InProgress & Adds the message to Retry // 3) Kill -> Removes the message from InProgress & Adds the message to Dead if resErr != nil { + if p.errHandler != nil { + p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry) + } if msg.Retried >= msg.Retry { p.kill(msg, resErr) } else { diff --git a/processor_test.go b/processor_test.go index 1309cc3..0a4ee07 100644 --- a/processor_test.go +++ b/processor_test.go @@ -68,7 +68,7 @@ func TestProcessorSuccess(t *testing.T) { } ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) + p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -120,24 +120,30 @@ func TestProcessorRetry(t *testing.T) { now := time.Now() tests := []struct { - enqueued []*base.TaskMessage // initial default queue state - incoming []*base.TaskMessage // tasks to be enqueued during run - delay time.Duration // retry delay duration - wait time.Duration // wait duration between starting and stopping processor for this test case - wantRetry []h.ZSetEntry // tasks in retry queue at the end - wantDead []*base.TaskMessage // tasks in dead queue at the end + enqueued []*base.TaskMessage // initial default queue state + incoming []*base.TaskMessage // tasks to be enqueued during run + delay time.Duration // retry delay duration + handler Handler // task handler + wait time.Duration // wait duration between starting and stopping processor for this test case + wantRetry []h.ZSetEntry // tasks in retry queue at the end + wantDead []*base.TaskMessage // tasks in dead queue at the end + wantErrCount int // number of times error handler should be called }{ { enqueued: []*base.TaskMessage{m1, m2}, incoming: []*base.TaskMessage{m3, m4}, delay: time.Minute, - wait: time.Second, + handler: HandlerFunc(func(ctx context.Context, task *Task) error { + return fmt.Errorf(errMsg) + }), + wait: time.Second, wantRetry: []h.ZSetEntry{ {Msg: &r2, Score: float64(now.Add(time.Minute).Unix())}, {Msg: &r3, Score: float64(now.Add(time.Minute).Unix())}, {Msg: &r4, Score: float64(now.Add(time.Minute).Unix())}, }, - wantDead: []*base.TaskMessage{&r1}, + wantDead: []*base.TaskMessage{&r1}, + wantErrCount: 4, }, } @@ -149,13 +155,19 @@ func TestProcessorRetry(t *testing.T) { delayFunc := func(n int, e error, t *Task) time.Duration { return tc.delay } - handler := func(ctx context.Context, task *Task) error { - return fmt.Errorf(errMsg) + var ( + mu sync.Mutex // guards n + n int // number of times error handler is called + ) + errHandler := func(t *Task, err error, retried, maxRetry int) { + mu.Lock() + defer mu.Unlock() + n++ } ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations) - p.handler = HandlerFunc(handler) + p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) + p.handler = tc.handler var wg sync.WaitGroup p.start(&wg) @@ -183,6 +195,10 @@ func TestProcessorRetry(t *testing.T) { if l := r.LLen(base.InProgressQueue).Val(); l != 0 { t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) } + + if n != tc.wantErrCount { + t.Errorf("error handler was called %d, want %d", n, tc.wantErrCount) + } } } @@ -216,7 +232,7 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { cancelations := base.NewCancelations() ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false) - p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations) + p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -284,7 +300,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { // Note: Set concurrency to 1 to make sure tasks are processed one at a time. cancelations := base.NewCancelations() ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) - p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) + p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup