mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Update processor to query queues based on priority
This commit is contained in:
parent
53d0902808
commit
8d9a2d1313
40
processor.go
40
processor.go
@ -7,6 +7,7 @@ package asynq
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -109,12 +110,7 @@ func (p *processor) start() {
|
|||||||
// exec pulls a task out of the queue and starts a worker goroutine to
|
// exec pulls a task out of the queue and starts a worker goroutine to
|
||||||
// process the task.
|
// process the task.
|
||||||
func (p *processor) exec() {
|
func (p *processor) exec() {
|
||||||
// TODO(hibiken): Randomize the order to avoid starving low priority queues
|
qnames := p.queues()
|
||||||
var qnames []string
|
|
||||||
for q := range p.queueConfig {
|
|
||||||
qnames = append(qnames, q)
|
|
||||||
}
|
|
||||||
|
|
||||||
msg, err := p.rdb.Dequeue(qnames...)
|
msg, err := p.rdb.Dequeue(qnames...)
|
||||||
if err == rdb.ErrNoProcessableTask {
|
if err == rdb.ErrNoProcessableTask {
|
||||||
// queues are empty, this is a normal behavior.
|
// queues are empty, this is a normal behavior.
|
||||||
@ -209,6 +205,21 @@ func (p *processor) kill(msg *base.TaskMessage, e error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// queues returns a list of queues to query. Order of the list
|
||||||
|
// is based roughly on the priority of each queue, but randomizes
|
||||||
|
// it to avoid starving low priority queues.
|
||||||
|
func (p *processor) queues() []string {
|
||||||
|
var names []string
|
||||||
|
for qname, priority := range p.queueConfig {
|
||||||
|
for i := 0; i < int(priority); i++ {
|
||||||
|
names = append(names, qname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
r.Shuffle(len(names), func(i, j int) { names[i], names[j] = names[j], names[i] })
|
||||||
|
return uniq(names, len(p.queueConfig))
|
||||||
|
}
|
||||||
|
|
||||||
// perform calls the handler with the given task.
|
// perform calls the handler with the given task.
|
||||||
// If the call returns without panic, it simply returns the value,
|
// If the call returns without panic, it simply returns the value,
|
||||||
// otherwise, it recovers from panic and returns an error.
|
// otherwise, it recovers from panic and returns an error.
|
||||||
@ -220,3 +231,20 @@ func perform(h Handler, task *Task) (err error) {
|
|||||||
}()
|
}()
|
||||||
return h.ProcessTask(task)
|
return h.ProcessTask(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// uniq dedupes elements and returns a slice of unique names of length l.
|
||||||
|
// Order of the output slice is based on the input list.
|
||||||
|
func uniq(names []string, l int) []string {
|
||||||
|
var res []string
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
for _, s := range names {
|
||||||
|
if _, ok := seen[s]; !ok {
|
||||||
|
seen[s] = struct{}{}
|
||||||
|
res = append(res, s)
|
||||||
|
}
|
||||||
|
if len(res) == l {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
@ -6,6 +6,7 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -51,7 +52,7 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
||||||
|
|
||||||
// instantiate a new processor
|
// instantiate a new processor
|
||||||
@ -137,7 +138,7 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
||||||
|
|
||||||
// instantiate a new processor
|
// instantiate a new processor
|
||||||
@ -178,6 +179,43 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProcessorQueues(t *testing.T) {
|
||||||
|
sortOpt := cmp.Transformer("SortStrings", func(in []string) []string {
|
||||||
|
out := append([]string(nil), in...) // Copy input to avoid mutating it
|
||||||
|
sort.Strings(out)
|
||||||
|
return out
|
||||||
|
})
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
queueCfg map[string]uint
|
||||||
|
want []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
queueCfg: map[string]uint{
|
||||||
|
"high": 6,
|
||||||
|
"default": 3,
|
||||||
|
"low": 1,
|
||||||
|
},
|
||||||
|
want: []string{"high", "default", "low"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
queueCfg: map[string]uint{
|
||||||
|
"default": 1,
|
||||||
|
},
|
||||||
|
want: []string{"default"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
p := newProcessor(nil, 10, tc.queueCfg, defaultDelayFunc)
|
||||||
|
got := p.queues()
|
||||||
|
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||||
|
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
||||||
|
tc.queueCfg, got, tc.want, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPerform(t *testing.T) {
|
func TestPerform(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
desc string
|
desc string
|
||||||
|
Loading…
Reference in New Issue
Block a user