2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-24 22:46:11 +08:00

Add a function to return a list of queues to query

This commit is contained in:
Ken Hibino
2020-01-01 18:19:55 -08:00
parent 6de80f9f04
commit fc5b65ddae
2 changed files with 70 additions and 10 deletions

View File

@@ -3,6 +3,7 @@ package asynq
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
@@ -17,11 +18,6 @@ type processor struct {
retryDelayFunc retryDelayFunc
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
// in case of a program shutdown or additon of a new queue.
dequeueTimeout time.Duration
// sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit.
sema chan struct{}
@@ -44,7 +40,6 @@ func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor {
return &processor{
rdb: r,
retryDelayFunc: fn,
dequeueTimeout: 2 * time.Second,
sema: make(chan struct{}, n),
done: make(chan struct{}),
abort: make(chan struct{}),
@@ -102,8 +97,7 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
func (p *processor) exec() {
// TODO(hibiken): sort the queues based on weight, but prevent starvation
msg, err := p.rdb.Dequeue(base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue)
msg, err := p.rdb.Dequeue(weightedQueues()...)
if err == rdb.ErrNoProcessableTask {
// queues are empty, this is a normal behavior.
return
@@ -206,3 +200,49 @@ func perform(h Handler, task *Task) (err error) {
}()
return h.ProcessTask(task)
}
// weightedQueues returns a list of redis queue key to query
// order is roughly based on the weight, it's randomized to
// prevent starvation.
//
// For example, if weight distribution is
// high = 6, default = 3, low = 1
// then high priority tasks will get processed first 60% of the time,
// and default priority task will get processed first 30% of the time,
// and low priority task will get processed first 10% of the time
func weightedQueues() []string {
const numOfQueues = 3
const (
highWeight = 6
defaultWeight = 3
lowWeight = 1
)
queues := make([]string, 10)
for i := 0; i < highWeight; i++ {
queues[i] = base.HighPriorityQueue
}
for i := highWeight; i < highWeight+defaultWeight; i++ {
queues[i] = base.DefaultQueue
}
for i := highWeight + defaultWeight; i < highWeight+defaultWeight+lowWeight; i++ {
queues[i] = base.LowPriorityQueue
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
rnd.Shuffle(len(queues), func(i, j int) {
queues[i], queues[j] = queues[j], queues[i]
})
var unique []string
seen := make(map[string]struct{})
for _, q := range queues {
if _, ok := seen[q]; !ok {
unique = append(unique, q)
}
seen[q] = struct{}{}
if len(unique) == numOfQueues {
break
}
}
return unique
}

View File

@@ -2,6 +2,7 @@ package asynq
import (
"fmt"
"sort"
"sync"
"testing"
"time"
@@ -61,7 +62,6 @@ func TestProcessorSuccess(t *testing.T) {
}
p := newProcessor(rdbClient, 10, defaultDelayFunc)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
for _, msg := range tc.incoming {
@@ -145,7 +145,6 @@ func TestProcessorRetry(t *testing.T) {
}
p := newProcessor(rdbClient, 10, delayFunc)
p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start()
for _, msg := range tc.incoming {
@@ -219,3 +218,24 @@ func TestPerform(t *testing.T) {
}
}
}
func TestWeightedQueues(t *testing.T) {
want := []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}
got := weightedQueues()
sortOpt := cmp.Transformer("SortString", func(in []string) []string {
out := append([]string(nil), in...) // Copy input to avoid mutating it
sort.Strings(out)
return out
})
if diff := cmp.Diff(want, got, sortOpt); diff != "" {
t.Errorf("weightedQueues = %v, want %v; (-want,+got):\n%s", got, want, diff)
}
}
func BenchmarkWeightedQueues(b *testing.B) {
for i := 0; i < b.N; i++ {
weightedQueues()
}
}