diff --git a/processor.go b/processor.go index 3969051..59f23d8 100644 --- a/processor.go +++ b/processor.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "math/rand" + "runtime/debug" "sort" "sync" "time" @@ -203,7 +204,7 @@ func (p *processor) exec() { resCh := make(chan error, 1) go func() { - resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler) + resCh <- p.perform(ctx, NewTask(msg.Type, msg.Payload)) }() select { @@ -340,13 +341,14 @@ func (p *processor) queues() []string { // perform calls the handler with the given task. // If the call returns without panic, it simply returns the value, // otherwise, it recovers from panic and returns an error. -func perform(ctx context.Context, task *Task, h Handler) (err error) { +func (p *processor) perform(ctx context.Context, task *Task) (err error) { defer func() { if x := recover(); x != nil { + p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack())) err = fmt.Errorf("panic: %v", x) } }() - return h.ProcessTask(ctx, task) + return p.handler.ProcessTask(ctx, task) } // uniq dedupes elements and returns a slice of unique names of length l. diff --git a/processor_test.go b/processor_test.go index 1c90b40..24b403f 100644 --- a/processor_test.go +++ b/processor_test.go @@ -564,7 +564,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { } } -func TestPerform(t *testing.T) { +func TestProcessorPerform(t *testing.T) { tests := []struct { desc string handler HandlerFunc @@ -596,9 +596,16 @@ func TestPerform(t *testing.T) { wantErr: true, }, } + // Note: We don't need to fully initialize the processor since we are only testing + // perform method. + p := newProcessor(processorParams{ + logger: testLogger, + queues: defaultQueueConfig, + }) for _, tc := range tests { - got := perform(context.Background(), tc.task, tc.handler) + p.handler = tc.handler + got := p.perform(context.Background(), tc.task) if !tc.wantErr && got != nil { t.Errorf("%s: perform() = %v, want nil", tc.desc, got) continue