From 5a6f737589088a7c85c9ceebc968d44a2dbf614f Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 14 Jan 2020 06:29:05 -0800 Subject: [PATCH] [performance] Use BRPOPLPUSH if one queue is used --- internal/rdb/rdb.go | 19 +++++++++++++++---- processor.go | 15 +++++++++++++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f2db220..a35adcf 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -60,11 +60,17 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { // is one and returns it. If all queues are empty, ErrNoProcessableTask // error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { - var keys []string - for _, q := range qnames { - keys = append(keys, base.QueueKey(q)) + var data string + var err error + if len(qnames) == 1 { + data, err = r.dequeueSingle(base.QueueKey(qnames[0])) + } else { + var keys []string + for _, q := range qnames { + keys = append(keys, base.QueueKey(q)) + } + data, err = r.dequeue(keys...) } - data, err := r.dequeue(keys...) if err == redis.Nil { return nil, ErrNoProcessableTask } @@ -79,6 +85,11 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { return &msg, nil } +func (r *RDB) dequeueSingle(queue string) (data string, err error) { + // timeout needed to avoid blocking forever + return r.client.BRPopLPush(queue, base.InProgressQueue, time.Second).Result() +} + func (r *RDB) dequeue(queues ...string) (data string, err error) { var args []interface{} for _, qkey := range queues { diff --git a/processor.go b/processor.go index 577d4b4..b9626ee 100644 --- a/processor.go +++ b/processor.go @@ -124,8 +124,12 @@ func (p *processor) exec() { msg, err := p.rdb.Dequeue(qnames...) if err == rdb.ErrNoProcessableTask { // queues are empty, this is a normal behavior. - // sleep to avoid slamming redis and let scheduler move tasks into queues. - time.Sleep(time.Second) + if len(p.queueConfig) > 1 { + // sleep to avoid slamming redis and let scheduler move tasks into queues. + // Note: With multiple queues, we are not using blocking pop operation and + // polling queues instead. This adds significant load to redis. + time.Sleep(time.Second) + } return } if err != nil { @@ -221,6 +225,13 @@ func (p *processor) kill(msg *base.TaskMessage, e error) { // If strict-priority is false, then the order of queue names are roughly based on // the priority level but randomized in order to avoid starving low priority queues. func (p *processor) queues() []string { + // skip the overhead of generating a list of queue names + // if we are processing one queue. + if len(p.queueConfig) == 1 { + for qname := range p.queueConfig { + return []string{qname} + } + } if p.orderedQueues != nil { return p.orderedQueues }