diff --git a/processor.go b/processor.go index 925b9c9..36664fb 100644 --- a/processor.go +++ b/processor.go @@ -193,64 +193,62 @@ func (p *processor) exec() { lease := base.NewLease(leaseExpirationTime) deadline := p.computeDeadline(msg) p.starting <- &workerInfo{msg, time.Now(), deadline, lease} - go func() { - defer func() { - p.finished <- msg - <-p.sema // release token - }() - - ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline) - p.cancelations.Add(msg.ID, cancel) - defer func() { - cancel() - p.cancelations.Delete(msg.ID) - }() - - // check context before starting a worker goroutine. - select { - case <-ctx.Done(): - // already canceled (e.g. deadline exceeded). - p.handleFailedMessage(ctx, lease, msg, ctx.Err()) - return - default: - } - - resCh := make(chan error, 1) - go func() { - task := newTask( - msg.Type, - msg.Payload, - &ResultWriter{ - id: msg.ID, - qname: msg.Queue, - broker: p.broker, - ctx: ctx, - }, - ) - resCh <- p.perform(ctx, task) - }() - - select { - case <-p.abort: - // time is up, push the message back to queue and quit this worker goroutine. - p.logger.Warnf("Quitting worker. task id=%s", msg.ID) - p.requeue(lease, msg) - return - case <-lease.Done(): - cancel() - p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired) - return - case <-ctx.Done(): - p.handleFailedMessage(ctx, lease, msg, ctx.Err()) - return - case resErr := <-resCh: - if resErr != nil { - p.handleFailedMessage(ctx, lease, msg, resErr) - return - } - p.handleSucceededMessage(lease, msg) - } + defer func() { + p.finished <- msg + <-p.sema // release token }() + + ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline) + p.cancelations.Add(msg.ID, cancel) + defer func() { + cancel() + p.cancelations.Delete(msg.ID) + }() + + // check context before starting a worker goroutine. + select { + case <-ctx.Done(): + // already canceled (e.g. deadline exceeded). + p.handleFailedMessage(ctx, lease, msg, ctx.Err()) + return + default: + } + + resCh := make(chan error, 1) + go func() { + task := newTask( + msg.Type, + msg.Payload, + &ResultWriter{ + id: msg.ID, + qname: msg.Queue, + broker: p.broker, + ctx: ctx, + }, + ) + resCh <- p.perform(ctx, task) + }() + + select { + case <-p.abort: + // time is up, push the message back to queue and quit this worker goroutine. + p.logger.Warnf("Quitting worker. task id=%s", msg.ID) + p.requeue(lease, msg) + return + case <-lease.Done(): + cancel() + p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired) + return + case <-ctx.Done(): + p.handleFailedMessage(ctx, lease, msg, ctx.Err()) + return + case resErr := <-resCh: + if resErr != nil { + p.handleFailedMessage(ctx, lease, msg, resErr) + return + } + p.handleSucceededMessage(lease, msg) + } } }