2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Minor code cleanup

This commit is contained in:
Ken Hibino 2020-05-12 21:30:51 -07:00
parent fb38086590
commit 60cbf8dc5a

View File

@ -159,17 +159,18 @@ func (p *processor) start(wg *sync.WaitGroup) {
func (p *processor) exec() { func (p *processor) exec() {
qnames := p.queues() qnames := p.queues()
msg, err := p.broker.Dequeue(qnames...) msg, err := p.broker.Dequeue(qnames...)
if err == rdb.ErrNoProcessableTask { // TODO: Need to decouple this error from rdb to support other brokers switch {
case err == rdb.ErrNoProcessableTask:
// queues are empty, this is a normal behavior. // queues are empty, this is a normal behavior.
if len(p.queueConfig) > 1 { if len(qnames) > 1 {
// sleep to avoid slamming redis and let scheduler move tasks into queues. // sleep to avoid slamming redis and let scheduler move tasks into queues.
// Note: With multiple queues, we are not using blocking pop operation and // Note: With multiple queues, we are not using blocking pop operation and
// polling queues instead. This adds significant load to redis. // polling queues instead. This adds significant load to redis.
time.Sleep(time.Second) time.Sleep(time.Second)
} }
p.logger.Debug("All queues are empty")
return return
} case err != nil:
if err != nil {
if p.errLogLimiter.Allow() { if p.errLogLimiter.Allow() {
p.logger.Errorf("Dequeue error: %v", err) p.logger.Errorf("Dequeue error: %v", err)
} }
@ -186,7 +187,7 @@ func (p *processor) exec() {
go func() { go func() {
defer func() { defer func() {
p.ss.DeleteWorkerStats(msg) p.ss.DeleteWorkerStats(msg)
<-p.sema /* release token */ <-p.sema // release token
}() }()
ctx, cancel := createContext(msg) ctx, cancel := createContext(msg)