2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Add strict-priority option

This commit is contained in:
Ken Hibino 2020-01-12 07:46:51 -08:00
parent 97316d6766
commit 84eef4ed0b
4 changed files with 137 additions and 8 deletions

View File

@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- NewTask constructor
- `Queues` option in `Config` to specify mutiple queues with priority level
- `Client` can schedule a task with `asynq.Queue(name)` to specify which queue to use
- `StrictPriority` option in `Config` to specify whether the priority should be followed strictly
### Changed

View File

@ -73,6 +73,13 @@ type Config struct {
// in "critical", "default", "low" should be processed 60%, 30%, 10% of
// the time respectively.
Queues map[string]uint
// StrictPriority indicates whether the queue priority should be treated strictly.
//
// If set to true, tasks in the queue with the highest priority is processed first.
// The tasks in lower priority queues are processed only when those queues with
// higher priorities are empty.
StrictPriority bool
}
// Formula taken from https://github.com/mperham/sidekiq.
@ -103,7 +110,7 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
}
rdb := rdb.NewRDB(r)
scheduler := newScheduler(rdb, 5*time.Second)
processor := newProcessor(rdb, n, normalizeQueueCfg(queues), delayFunc)
processor := newProcessor(rdb, n, normalizeQueueCfg(queues), cfg.StrictPriority, delayFunc)
return &Background{
rdb: rdb,
scheduler: scheduler,

View File

@ -8,6 +8,7 @@ import (
"fmt"
"log"
"math/rand"
"sort"
"sync"
"time"
@ -22,6 +23,9 @@ type processor struct {
queueConfig map[string]uint
// orderedQueues is set only in strict-priority mode.
orderedQueues []string
retryDelayFunc retryDelayFunc
// sema is a counting semaphore to ensure the number of active workers
@ -42,10 +46,22 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, fn retryDelayFunc) *processor {
// newProcessor constructs a new processor.
//
// r is an instance of RDB used by the processor.
// n specifies the max number of concurrenct worker goroutines.
// qfcg is a mapping of queue names to associated priority level.
// strict specifies whether queue priority should be treated strictly.
// fn is a function to compute retry delay.
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc) *processor {
orderedQueues := []string(nil)
if strict {
orderedQueues = sortByPriority(qcfg)
}
return &processor{
rdb: r,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
sema: make(chan struct{}, n),
done: make(chan struct{}),
@ -199,10 +215,15 @@ func (p *processor) kill(msg *base.TaskMessage, e error) {
}
}
// queues returns a list of queues to query. Order of the list
// is based roughly on the priority of each queue, but randomizes
// it to avoid starving low priority queues.
// queues returns a list of queues to query.
// Order of the queue names is based on the priority of each queue.
// Queue names is sorted by their priority level if strict-priority is true.
// If strict-priority is false, then the order of queue names are roughly based on
// the priority level but randomized in order to avoid starving low priority queues.
func (p *processor) queues() []string {
if p.orderedQueues != nil {
return p.orderedQueues
}
var names []string
for qname, priority := range p.queueConfig {
for i := 0; i < int(priority); i++ {
@ -242,3 +263,29 @@ func uniq(names []string, l int) []string {
}
return res
}
// sortByPriority returns the list of queue names sorted by
// their priority level in descending order.
func sortByPriority(qcfg map[string]uint) []string {
var queues []*queue
for qname, n := range qcfg {
queues = append(queues, &queue{qname, n})
}
sort.Sort(sort.Reverse(byPriority(queues)))
var res []string
for _, q := range queues {
res = append(res, q.name)
}
return res
}
type queue struct {
name string
priority uint
}
type byPriority []*queue
func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }

View File

@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task)
return nil
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc)
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc)
p.handler = HandlerFunc(handler)
p.start()
@ -148,7 +148,7 @@ func TestProcessorRetry(t *testing.T) {
handler := func(task *Task) error {
return fmt.Errorf(errMsg)
}
p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc)
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc)
p.handler = HandlerFunc(handler)
p.start()
@ -207,7 +207,7 @@ func TestProcessorQueues(t *testing.T) {
}
for _, tc := range tests {
p := newProcessor(nil, 10, tc.queueCfg, defaultDelayFunc)
p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -216,6 +216,80 @@ func TestProcessorQueues(t *testing.T) {
}
}
func TestProcessorWithStrictPriority(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("send_email", nil)
m2 := h.NewTaskMessage("send_email", nil)
m3 := h.NewTaskMessage("send_email", nil)
m4 := h.NewTaskMessage("gen_thumbnail", nil)
m5 := h.NewTaskMessage("gen_thumbnail", nil)
m6 := h.NewTaskMessage("sync", nil)
m7 := h.NewTaskMessage("sync", nil)
t1 := NewTask(m1.Type, m1.Payload)
t2 := NewTask(m2.Type, m2.Payload)
t3 := NewTask(m3.Type, m3.Payload)
t4 := NewTask(m4.Type, m4.Payload)
t5 := NewTask(m5.Type, m5.Payload)
t6 := NewTask(m6.Type, m6.Payload)
t7 := NewTask(m7.Type, m7.Payload)
tests := []struct {
enqueued map[string][]*base.TaskMessage // initial queues state
wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end
}{
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m4, m5},
"critical": {m1, m2, m3},
"low": {m6, m7},
},
wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
for qname, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r, msgs, qname)
}
// instantiate a new processor
var mu sync.Mutex
var processed []*Task
handler := func(task *Task) error {
mu.Lock()
defer mu.Unlock()
processed = append(processed, task)
return nil
}
queueCfg := map[string]uint{
"critical": 3,
base.DefaultQueueName: 2,
"low": 1,
}
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc)
p.handler = HandlerFunc(handler)
p.start()
time.Sleep(tc.wait)
p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
}
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
}
}
}
func TestPerform(t *testing.T) {
tests := []struct {
desc string