2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

[ci skip] Normalize queue priority numbers

This commit is contained in:
Ken Hibino 2020-01-07 21:17:01 -08:00
parent 8d9a2d1313
commit 24bb45b36b
2 changed files with 115 additions and 2 deletions

View File

@ -97,12 +97,12 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
delayFunc = defaultDelayFunc delayFunc = defaultDelayFunc
} }
queues := cfg.Queues queues := cfg.Queues
if queues == nil { if queues == nil || len(queues) == 0 {
queues = defaultQueueConfig queues = defaultQueueConfig
} }
rdb := rdb.NewRDB(r) rdb := rdb.NewRDB(r)
scheduler := newScheduler(rdb, 5*time.Second) scheduler := newScheduler(rdb, 5*time.Second)
processor := newProcessor(rdb, n, queues, delayFunc) processor := newProcessor(rdb, n, normalizeQueueCfg(queues), delayFunc)
return &Background{ return &Background{
rdb: rdb, rdb: rdb,
scheduler: scheduler, scheduler: scheduler,
@ -185,3 +185,35 @@ func (bg *Background) stop() {
bg.processor.handler = nil bg.processor.handler = nil
bg.running = false bg.running = false
} }
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint {
var xs []uint
for _, x := range queueCfg {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]uint)
for q, x := range queueCfg {
res[q] = x / d
}
return res
}
func gcd(xs ...uint) uint {
fn := func(x, y uint) uint {
for y > 0 {
x, y = y, x%y
}
return x
}
res := xs[0]
for i := 0; i < len(xs); i++ {
res = fn(xs[i], res)
if res == 1 {
return 1
}
}
return res
}

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"go.uber.org/goleak" "go.uber.org/goleak"
) )
@ -39,3 +40,83 @@ func TestBackground(t *testing.T) {
bg.stop() bg.stop()
} }
func TestGCD(t *testing.T) {
tests := []struct {
input []uint
want uint
}{
{[]uint{6, 2, 12}, 2},
{[]uint{3, 3, 3}, 3},
{[]uint{6, 3, 1}, 1},
{[]uint{1}, 1},
{[]uint{1, 0, 2}, 1},
{[]uint{8, 0, 4}, 4},
{[]uint{9, 12, 18, 30}, 3},
}
for _, tc := range tests {
got := gcd(tc.input...)
if got != tc.want {
t.Errorf("gcd(%v) = %d, want %d", tc.input, got, tc.want)
}
}
}
func TestNormalizeQueueCfg(t *testing.T) {
tests := []struct {
input map[string]uint
want map[string]uint
}{
{
input: map[string]uint{
"high": 100,
"default": 20,
"low": 5,
},
want: map[string]uint{
"high": 20,
"default": 4,
"low": 1,
},
},
{
input: map[string]uint{
"default": 10,
},
want: map[string]uint{
"default": 1,
},
},
{
input: map[string]uint{
"critical": 5,
"default": 1,
},
want: map[string]uint{
"critical": 5,
"default": 1,
},
},
{
input: map[string]uint{
"critical": 6,
"default": 3,
"low": 0,
},
want: map[string]uint{
"critical": 2,
"default": 1,
"low": 0,
},
},
}
for _, tc := range tests {
got := normalizeQueueCfg(tc.input)
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("normalizeQueueCfg(%v) = %v, want %v; (-want, +got):\n%s",
tc.input, got, tc.want, diff)
}
}
}