diff --git a/processor.go b/processor.go index cc6d26b..2840287 100644 --- a/processor.go +++ b/processor.go @@ -159,17 +159,18 @@ func (p *processor) start(wg *sync.WaitGroup) { func (p *processor) exec() { qnames := p.queues() msg, err := p.broker.Dequeue(qnames...) - if err == rdb.ErrNoProcessableTask { // TODO: Need to decouple this error from rdb to support other brokers + switch { + case err == rdb.ErrNoProcessableTask: // queues are empty, this is a normal behavior. - if len(p.queueConfig) > 1 { + if len(qnames) > 1 { // sleep to avoid slamming redis and let scheduler move tasks into queues. // Note: With multiple queues, we are not using blocking pop operation and // polling queues instead. This adds significant load to redis. time.Sleep(time.Second) } + p.logger.Debug("All queues are empty") return - } - if err != nil { + case err != nil: if p.errLogLimiter.Allow() { p.logger.Errorf("Dequeue error: %v", err) } @@ -186,7 +187,7 @@ func (p *processor) exec() { go func() { defer func() { p.ss.DeleteWorkerStats(msg) - <-p.sema /* release token */ + <-p.sema // release token }() ctx, cancel := createContext(msg)