diff --git a/processor.go b/processor.go index 36bc097..9845e73 100644 --- a/processor.go +++ b/processor.go @@ -3,6 +3,7 @@ package asynq import ( "fmt" "log" + "math/rand" "sync" "time" @@ -17,11 +18,6 @@ type processor struct { retryDelayFunc retryDelayFunc - // timeout for blocking dequeue operation. - // dequeue needs to timeout to avoid blocking forever - // in case of a program shutdown or additon of a new queue. - dequeueTimeout time.Duration - // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} @@ -44,7 +40,6 @@ func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor { return &processor{ rdb: r, retryDelayFunc: fn, - dequeueTimeout: 2 * time.Second, sema: make(chan struct{}, n), done: make(chan struct{}), abort: make(chan struct{}), @@ -102,8 +97,7 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - // TODO(hibiken): sort the queues based on weight, but prevent starvation - msg, err := p.rdb.Dequeue(base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue) + msg, err := p.rdb.Dequeue(weightedQueues()...) if err == rdb.ErrNoProcessableTask { // queues are empty, this is a normal behavior. return @@ -206,3 +200,49 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } + +// weightedQueues returns a list of redis queue key to query +// order is roughly based on the weight, it's randomized to +// prevent starvation. +// +// For example, if weight distribution is +// high = 6, default = 3, low = 1 +// then high priority tasks will get processed first 60% of the time, +// and default priority task will get processed first 30% of the time, +// and low priority task will get processed first 10% of the time +func weightedQueues() []string { + const numOfQueues = 3 + const ( + highWeight = 6 + defaultWeight = 3 + lowWeight = 1 + ) + queues := make([]string, 10) + for i := 0; i < highWeight; i++ { + queues[i] = base.HighPriorityQueue + } + for i := highWeight; i < highWeight+defaultWeight; i++ { + queues[i] = base.DefaultQueue + } + for i := highWeight + defaultWeight; i < highWeight+defaultWeight+lowWeight; i++ { + queues[i] = base.LowPriorityQueue + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + rnd.Shuffle(len(queues), func(i, j int) { + queues[i], queues[j] = queues[j], queues[i] + }) + + var unique []string + seen := make(map[string]struct{}) + + for _, q := range queues { + if _, ok := seen[q]; !ok { + unique = append(unique, q) + } + seen[q] = struct{}{} + if len(unique) == numOfQueues { + break + } + } + return unique +} diff --git a/processor_test.go b/processor_test.go index b23daae..84dc0c6 100644 --- a/processor_test.go +++ b/processor_test.go @@ -2,6 +2,7 @@ package asynq import ( "fmt" + "sort" "sync" "testing" "time" @@ -61,7 +62,6 @@ func TestProcessorSuccess(t *testing.T) { } p := newProcessor(rdbClient, 10, defaultDelayFunc) p.handler = HandlerFunc(handler) - p.dequeueTimeout = time.Second // short time out for test purpose p.start() for _, msg := range tc.incoming { @@ -145,7 +145,6 @@ func TestProcessorRetry(t *testing.T) { } p := newProcessor(rdbClient, 10, delayFunc) p.handler = HandlerFunc(handler) - p.dequeueTimeout = time.Second // short time out for test purpose p.start() for _, msg := range tc.incoming { @@ -219,3 +218,24 @@ func TestPerform(t *testing.T) { } } } + +func TestWeightedQueues(t *testing.T) { + want := []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue} + + got := weightedQueues() + + sortOpt := cmp.Transformer("SortString", func(in []string) []string { + out := append([]string(nil), in...) // Copy input to avoid mutating it + sort.Strings(out) + return out + }) + if diff := cmp.Diff(want, got, sortOpt); diff != "" { + t.Errorf("weightedQueues = %v, want %v; (-want,+got):\n%s", got, want, diff) + } +} + +func BenchmarkWeightedQueues(b *testing.B) { + for i := 0; i < b.N; i++ { + weightedQueues() + } +}