From 8d9a2d131357e934bc84053c3c443f250ed440be Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 7 Jan 2020 07:03:39 -0800 Subject: [PATCH] Update processor to query queues based on priority --- processor.go | 40 ++++++++++++++++++++++++++++++++++------ processor_test.go | 42 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 74 insertions(+), 8 deletions(-) diff --git a/processor.go b/processor.go index 55e93f5..214f8db 100644 --- a/processor.go +++ b/processor.go @@ -7,6 +7,7 @@ package asynq import ( "fmt" "log" + "math/rand" "sync" "time" @@ -109,12 +110,7 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - // TODO(hibiken): Randomize the order to avoid starving low priority queues - var qnames []string - for q := range p.queueConfig { - qnames = append(qnames, q) - } - + qnames := p.queues() msg, err := p.rdb.Dequeue(qnames...) if err == rdb.ErrNoProcessableTask { // queues are empty, this is a normal behavior. @@ -209,6 +205,21 @@ func (p *processor) kill(msg *base.TaskMessage, e error) { } } +// queues returns a list of queues to query. Order of the list +// is based roughly on the priority of each queue, but randomizes +// it to avoid starving low priority queues. +func (p *processor) queues() []string { + var names []string + for qname, priority := range p.queueConfig { + for i := 0; i < int(priority); i++ { + names = append(names, qname) + } + } + r := rand.New(rand.NewSource(time.Now().UnixNano())) + r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] }) + return uniq(names, len(p.queueConfig)) +} + // perform calls the handler with the given task. // If the call returns without panic, it simply returns the value, // otherwise, it recovers from panic and returns an error. @@ -220,3 +231,20 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } + +// uniq dedupes elements and returns a slice of unique names of length l. +// Order of the output slice is based on the input list. +func uniq(names []string, l int) []string { + var res []string + seen := make(map[string]struct{}) + for _, s := range names { + if _, ok := seen[s]; !ok { + seen[s] = struct{}{} + res = append(res, s) + } + if len(res) == l { + break + } + } + return res +} diff --git a/processor_test.go b/processor_test.go index 943ebd7..83113b6 100644 --- a/processor_test.go +++ b/processor_test.go @@ -6,6 +6,7 @@ package asynq import ( "fmt" + "sort" "sync" "testing" "time" @@ -51,7 +52,7 @@ func TestProcessorSuccess(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. + h.FlushDB(t, r) // clean up db before each test case. h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. // instantiate a new processor @@ -137,7 +138,7 @@ func TestProcessorRetry(t *testing.T) { } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. + h.FlushDB(t, r) // clean up db before each test case. h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue. // instantiate a new processor @@ -178,6 +179,43 @@ func TestProcessorRetry(t *testing.T) { } } +func TestProcessorQueues(t *testing.T) { + sortOpt := cmp.Transformer("SortStrings", func(in []string) []string { + out := append([]string(nil), in...) // Copy input to avoid mutating it + sort.Strings(out) + return out + }) + + tests := []struct { + queueCfg map[string]uint + want []string + }{ + { + queueCfg: map[string]uint{ + "high": 6, + "default": 3, + "low": 1, + }, + want: []string{"high", "default", "low"}, + }, + { + queueCfg: map[string]uint{ + "default": 1, + }, + want: []string{"default"}, + }, + } + + for _, tc := range tests { + p := newProcessor(nil, 10, tc.queueCfg, defaultDelayFunc) + got := p.queues() + if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { + t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", + tc.queueCfg, got, tc.want, diff) + } + } +} + func TestPerform(t *testing.T) { tests := []struct { desc string