From 84eef4ed0b3262af52b0b8d26a609d25e81c94ee Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 12 Jan 2020 07:46:51 -0800 Subject: [PATCH] Add strict-priority option --- CHANGELOG.md | 1 + background.go | 9 +++++- processor.go | 55 +++++++++++++++++++++++++++++--- processor_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be1af2c..ace1429 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - NewTask constructor - `Queues` option in `Config` to specify mutiple queues with priority level - `Client` can schedule a task with `asynq.Queue(name)` to specify which queue to use +- `StrictPriority` option in `Config` to specify whether the priority should be followed strictly ### Changed diff --git a/background.go b/background.go index 3cf6122..6c10406 100644 --- a/background.go +++ b/background.go @@ -73,6 +73,13 @@ type Config struct { // in "critical", "default", "low" should be processed 60%, 30%, 10% of // the time respectively. Queues map[string]uint + + // StrictPriority indicates whether the queue priority should be treated strictly. + // + // If set to true, tasks in the queue with the highest priority is processed first. + // The tasks in lower priority queues are processed only when those queues with + // higher priorities are empty. + StrictPriority bool } // Formula taken from https://github.com/mperham/sidekiq. @@ -103,7 +110,7 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { } rdb := rdb.NewRDB(r) scheduler := newScheduler(rdb, 5*time.Second) - processor := newProcessor(rdb, n, normalizeQueueCfg(queues), delayFunc) + processor := newProcessor(rdb, n, normalizeQueueCfg(queues), cfg.StrictPriority, delayFunc) return &Background{ rdb: rdb, scheduler: scheduler, diff --git a/processor.go b/processor.go index a9f71a7..5f2fa57 100644 --- a/processor.go +++ b/processor.go @@ -8,6 +8,7 @@ import ( "fmt" "log" "math/rand" + "sort" "sync" "time" @@ -22,6 +23,9 @@ type processor struct { queueConfig map[string]uint + // orderedQueues is set only in strict-priority mode. + orderedQueues []string + retryDelayFunc retryDelayFunc // sema is a counting semaphore to ensure the number of active workers @@ -42,10 +46,22 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration -func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, fn retryDelayFunc) *processor { +// newProcessor constructs a new processor. +// +// r is an instance of RDB used by the processor. +// n specifies the max number of concurrenct worker goroutines. +// qfcg is a mapping of queue names to associated priority level. +// strict specifies whether queue priority should be treated strictly. +// fn is a function to compute retry delay. +func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc) *processor { + orderedQueues := []string(nil) + if strict { + orderedQueues = sortByPriority(qcfg) + } return &processor{ rdb: r, queueConfig: qcfg, + orderedQueues: orderedQueues, retryDelayFunc: fn, sema: make(chan struct{}, n), done: make(chan struct{}), @@ -199,10 +215,15 @@ func (p *processor) kill(msg *base.TaskMessage, e error) { } } -// queues returns a list of queues to query. Order of the list -// is based roughly on the priority of each queue, but randomizes -// it to avoid starving low priority queues. +// queues returns a list of queues to query. +// Order of the queue names is based on the priority of each queue. +// Queue names is sorted by their priority level if strict-priority is true. +// If strict-priority is false, then the order of queue names are roughly based on +// the priority level but randomized in order to avoid starving low priority queues. func (p *processor) queues() []string { + if p.orderedQueues != nil { + return p.orderedQueues + } var names []string for qname, priority := range p.queueConfig { for i := 0; i < int(priority); i++ { @@ -242,3 +263,29 @@ func uniq(names []string, l int) []string { } return res } + +// sortByPriority returns the list of queue names sorted by +// their priority level in descending order. +func sortByPriority(qcfg map[string]uint) []string { + var queues []*queue + for qname, n := range qcfg { + queues = append(queues, &queue{qname, n}) + } + sort.Sort(sort.Reverse(byPriority(queues))) + var res []string + for _, q := range queues { + res = append(res, q.name) + } + return res +} + +type queue struct { + name string + priority uint +} + +type byPriority []*queue + +func (x byPriority) Len() int { return len(x) } +func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority } +func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] } diff --git a/processor_test.go b/processor_test.go index bcafbc6..d45d3c9 100644 --- a/processor_test.go +++ b/processor_test.go @@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc) p.handler = HandlerFunc(handler) p.start() @@ -148,7 +148,7 @@ func TestProcessorRetry(t *testing.T) { handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc) p.handler = HandlerFunc(handler) p.start() @@ -207,7 +207,7 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - p := newProcessor(nil, 10, tc.queueCfg, defaultDelayFunc) + p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -216,6 +216,80 @@ func TestProcessorQueues(t *testing.T) { } } +func TestProcessorWithStrictPriority(t *testing.T) { + r := setup(t) + rdbClient := rdb.NewRDB(r) + + m1 := h.NewTaskMessage("send_email", nil) + m2 := h.NewTaskMessage("send_email", nil) + m3 := h.NewTaskMessage("send_email", nil) + m4 := h.NewTaskMessage("gen_thumbnail", nil) + m5 := h.NewTaskMessage("gen_thumbnail", nil) + m6 := h.NewTaskMessage("sync", nil) + m7 := h.NewTaskMessage("sync", nil) + + t1 := NewTask(m1.Type, m1.Payload) + t2 := NewTask(m2.Type, m2.Payload) + t3 := NewTask(m3.Type, m3.Payload) + t4 := NewTask(m4.Type, m4.Payload) + t5 := NewTask(m5.Type, m5.Payload) + t6 := NewTask(m6.Type, m6.Payload) + t7 := NewTask(m7.Type, m7.Payload) + + tests := []struct { + enqueued map[string][]*base.TaskMessage // initial queues state + wait time.Duration // wait duration between starting and stopping processor for this test case + wantProcessed []*Task // tasks to be processed at the end + }{ + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m4, m5}, + "critical": {m1, m2, m3}, + "low": {m6, m7}, + }, + wait: time.Second, + wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7}, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r, msgs, qname) + } + + // instantiate a new processor + var mu sync.Mutex + var processed []*Task + handler := func(task *Task) error { + mu.Lock() + defer mu.Unlock() + processed = append(processed, task) + return nil + } + queueCfg := map[string]uint{ + "critical": 3, + base.DefaultQueueName: 2, + "low": 1, + } + // Note: Set concurrency to 1 to make sure tasks are processed one at a time. + p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc) + p.handler = HandlerFunc(handler) + + p.start() + time.Sleep(tc.wait) + p.terminate() + + if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { + t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) + } + + if l := r.LLen(base.InProgressQueue).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) + } + } +} + func TestPerform(t *testing.T) { tests := []struct { desc string