mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-21 21:46:12 +08:00
fix:Remove useless coroutines
This commit is contained in:
112
processor.go
112
processor.go
@@ -193,64 +193,62 @@ func (p *processor) exec() {
|
|||||||
lease := base.NewLease(leaseExpirationTime)
|
lease := base.NewLease(leaseExpirationTime)
|
||||||
deadline := p.computeDeadline(msg)
|
deadline := p.computeDeadline(msg)
|
||||||
p.starting <- &workerInfo{msg, time.Now(), deadline, lease}
|
p.starting <- &workerInfo{msg, time.Now(), deadline, lease}
|
||||||
go func() {
|
defer func() {
|
||||||
defer func() {
|
p.finished <- msg
|
||||||
p.finished <- msg
|
<-p.sema // release token
|
||||||
<-p.sema // release token
|
|
||||||
}()
|
|
||||||
|
|
||||||
ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
|
|
||||||
p.cancelations.Add(msg.ID, cancel)
|
|
||||||
defer func() {
|
|
||||||
cancel()
|
|
||||||
p.cancelations.Delete(msg.ID)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// check context before starting a worker goroutine.
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
// already canceled (e.g. deadline exceeded).
|
|
||||||
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
resCh := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
task := newTask(
|
|
||||||
msg.Type,
|
|
||||||
msg.Payload,
|
|
||||||
&ResultWriter{
|
|
||||||
id: msg.ID,
|
|
||||||
qname: msg.Queue,
|
|
||||||
broker: p.broker,
|
|
||||||
ctx: ctx,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
resCh <- p.perform(ctx, task)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-p.abort:
|
|
||||||
// time is up, push the message back to queue and quit this worker goroutine.
|
|
||||||
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
|
|
||||||
p.requeue(lease, msg)
|
|
||||||
return
|
|
||||||
case <-lease.Done():
|
|
||||||
cancel()
|
|
||||||
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
|
|
||||||
return
|
|
||||||
case <-ctx.Done():
|
|
||||||
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
|
|
||||||
return
|
|
||||||
case resErr := <-resCh:
|
|
||||||
if resErr != nil {
|
|
||||||
p.handleFailedMessage(ctx, lease, msg, resErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.handleSucceededMessage(lease, msg)
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
ctx, cancel := asynqcontext.New(p.baseCtxFn(), msg, deadline)
|
||||||
|
p.cancelations.Add(msg.ID, cancel)
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
p.cancelations.Delete(msg.ID)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// check context before starting a worker goroutine.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// already canceled (e.g. deadline exceeded).
|
||||||
|
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
resCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
task := newTask(
|
||||||
|
msg.Type,
|
||||||
|
msg.Payload,
|
||||||
|
&ResultWriter{
|
||||||
|
id: msg.ID,
|
||||||
|
qname: msg.Queue,
|
||||||
|
broker: p.broker,
|
||||||
|
ctx: ctx,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
resCh <- p.perform(ctx, task)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-p.abort:
|
||||||
|
// time is up, push the message back to queue and quit this worker goroutine.
|
||||||
|
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
|
||||||
|
p.requeue(lease, msg)
|
||||||
|
return
|
||||||
|
case <-lease.Done():
|
||||||
|
cancel()
|
||||||
|
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
p.handleFailedMessage(ctx, lease, msg, ctx.Err())
|
||||||
|
return
|
||||||
|
case resErr := <-resCh:
|
||||||
|
if resErr != nil {
|
||||||
|
p.handleFailedMessage(ctx, lease, msg, resErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.handleSucceededMessage(lease, msg)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user