2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Use int as priority value type.

Do not use unsigned int for merely non-negative quantities as it
complicates simple arithmetic.
This commit is contained in:
Ken Hibino 2020-02-12 22:23:25 -08:00
parent bf31fcc3ec
commit d33ca98648
15 changed files with 84 additions and 70 deletions

View File

@ -72,9 +72,8 @@ func main() {
bg := asynq.NewBackground(r, &asynq.Config{ bg := asynq.NewBackground(r, &asynq.Config{
// Specify how many concurrent workers to use // Specify how many concurrent workers to use
Concurrency: 10, Concurrency: 10,
// You can optionally create multiple queues // You can optionally create multiple queues with different priority.
// with different priority level Queues: map[string]int{
Queues: map[string]uint{
"critical": 6, "critical": 6,
"default": 3, "default": 3,
"low": 1, "low": 1,

View File

@ -47,7 +47,7 @@ type Background struct {
type Config struct { type Config struct {
// Maximum number of concurrent processing of tasks. // Maximum number of concurrent processing of tasks.
// //
// If set to zero or negative value, NewBackground will overwrite the value to one. // If set to a zero or negative value, NewBackground will overwrite the value to one.
Concurrency int Concurrency int
// Function to calculate retry delay for a failed task. // Function to calculate retry delay for a failed task.
@ -59,15 +59,15 @@ type Config struct {
// t is the task in question. // t is the task in question.
RetryDelayFunc func(n int, e error, t *Task) time.Duration RetryDelayFunc func(n int, e error, t *Task) time.Duration
// List of queues to process with given priority level. Keys are the names of the // List of queues to process with given priority value. Keys are the names of the
// queues and values are associated priority level. // queues and values are associated priority value.
// //
// If set to nil or not specified, the background will process only the "default" queue. // If set to nil or not specified, the background will process only the "default" queue.
// //
// Priority is treated as follows to avoid starving low priority queues. // Priority is treated as follows to avoid starving low priority queues.
// //
// Example: // Example:
// Queues: map[string]uint{ // Queues: map[string]int{
// "critical": 6, // "critical": 6,
// "default": 3, // "default": 3,
// "low": 1, // "low": 1,
@ -75,7 +75,9 @@ type Config struct {
// With the above config and given that all queues are not empty, the tasks // With the above config and given that all queues are not empty, the tasks
// in "critical", "default", "low" should be processed 60%, 30%, 10% of // in "critical", "default", "low" should be processed 60%, 30%, 10% of
// the time respectively. // the time respectively.
Queues map[string]uint //
// If a queue has a zero or negative priority value, the queue will be ignored.
Queues map[string]int
// StrictPriority indicates whether the queue priority should be treated strictly. // StrictPriority indicates whether the queue priority should be treated strictly.
// //
@ -92,7 +94,7 @@ func defaultDelayFunc(n int, e error, t *Task) time.Duration {
return time.Duration(s) * time.Second return time.Duration(s) * time.Second
} }
var defaultQueueConfig = map[string]uint{ var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1, base.DefaultQueueName: 1,
} }
@ -107,8 +109,13 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
if delayFunc == nil { if delayFunc == nil {
delayFunc = defaultDelayFunc delayFunc = defaultDelayFunc
} }
queues := cfg.Queues queues := make(map[string]int)
if queues == nil || len(queues) == 0 { for qname, p := range cfg.Queues {
if p > 0 {
queues[qname] = p
}
}
if len(queues) == 0 {
queues = defaultQueueConfig queues = defaultQueueConfig
} }

View File

@ -43,16 +43,16 @@ func TestBackground(t *testing.T) {
func TestGCD(t *testing.T) { func TestGCD(t *testing.T) {
tests := []struct { tests := []struct {
input []uint input []int
want uint want int
}{ }{
{[]uint{6, 2, 12}, 2}, {[]int{6, 2, 12}, 2},
{[]uint{3, 3, 3}, 3}, {[]int{3, 3, 3}, 3},
{[]uint{6, 3, 1}, 1}, {[]int{6, 3, 1}, 1},
{[]uint{1}, 1}, {[]int{1}, 1},
{[]uint{1, 0, 2}, 1}, {[]int{1, 0, 2}, 1},
{[]uint{8, 0, 4}, 4}, {[]int{8, 0, 4}, 4},
{[]uint{9, 12, 18, 30}, 3}, {[]int{9, 12, 18, 30}, 3},
} }
for _, tc := range tests { for _, tc := range tests {
@ -65,46 +65,46 @@ func TestGCD(t *testing.T) {
func TestNormalizeQueueCfg(t *testing.T) { func TestNormalizeQueueCfg(t *testing.T) {
tests := []struct { tests := []struct {
input map[string]uint input map[string]int
want map[string]uint want map[string]int
}{ }{
{ {
input: map[string]uint{ input: map[string]int{
"high": 100, "high": 100,
"default": 20, "default": 20,
"low": 5, "low": 5,
}, },
want: map[string]uint{ want: map[string]int{
"high": 20, "high": 20,
"default": 4, "default": 4,
"low": 1, "low": 1,
}, },
}, },
{ {
input: map[string]uint{ input: map[string]int{
"default": 10, "default": 10,
}, },
want: map[string]uint{ want: map[string]int{
"default": 1, "default": 1,
}, },
}, },
{ {
input: map[string]uint{ input: map[string]int{
"critical": 5, "critical": 5,
"default": 1, "default": 1,
}, },
want: map[string]uint{ want: map[string]int{
"critical": 5, "critical": 5,
"default": 1, "default": 1,
}, },
}, },
{ {
input: map[string]uint{ input: map[string]int{
"critical": 6, "critical": 6,
"default": 3, "default": 3,
"low": 0, "low": 0,
}, },
want: map[string]uint{ want: map[string]int{
"critical": 2, "critical": 2,
"default": 1, "default": 1,
"low": 0, "low": 0,

View File

@ -120,7 +120,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
client := NewClient(redis) client := NewClient(redis)
bg := NewBackground(redis, &Config{ bg := NewBackground(redis, &Config{
Concurrency: 10, Concurrency: 10,
Queues: map[string]uint{ Queues: map[string]int{
"high": 6, "high": 6,
"default": 3, "default": 3,
"low": 1, "low": 1,

View File

@ -23,10 +23,10 @@ func TestHeartbeater(t *testing.T) {
interval time.Duration interval time.Duration
host string host string
pid int pid int
queues map[string]uint queues map[string]int
concurrency int concurrency int
}{ }{
{time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10}, {time.Second, "some.address.ec2.aws.com", 45678, map[string]int{"default": 1}, 10},
} }
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)

View File

@ -91,7 +91,7 @@ type TaskMessage struct {
type ProcessInfo struct { type ProcessInfo struct {
mu sync.Mutex mu sync.Mutex
Concurrency int Concurrency int
Queues map[string]uint Queues map[string]int
StrictPriority bool StrictPriority bool
PID int PID int
Host string Host string
@ -101,7 +101,7 @@ type ProcessInfo struct {
} }
// NewProcessInfo returns a new instance of ProcessInfo. // NewProcessInfo returns a new instance of ProcessInfo.
func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo { func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo {
return &ProcessInfo{ return &ProcessInfo{
Host: host, Host: host,
PID: pid, PID: pid,

View File

@ -82,7 +82,7 @@ func TestProcessInfoKey(t *testing.T) {
// Note: Run this test with -race flag to check for data race. // Note: Run this test with -race flag to check for data race.
func TestProcessInfoSetter(t *testing.T) { func TestProcessInfoSetter(t *testing.T) {
pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false) pi := NewProcessInfo("localhost", 1234, 8, map[string]int{"default": 1}, false)
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -245,10 +245,10 @@ func reverse(x []string) {
// for the list operation. // for the list operation.
type Pagination struct { type Pagination struct {
// Number of items in the page. // Number of items in the page.
Size uint Size int
// Page number starting from zero. // Page number starting from zero.
Page uint Page int
} }
func (p Pagination) start() int64 { func (p Pagination) start() int64 {

View File

@ -317,8 +317,8 @@ func TestListEnqueuedPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
qname string qname string
page uint page int
size uint size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
@ -418,8 +418,8 @@ func TestListInProgressPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
page uint page int
size uint size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
@ -524,8 +524,8 @@ func TestListScheduledPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
page uint page int
size uint size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
@ -667,8 +667,8 @@ func TestListRetryPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
page uint page int
size uint size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
@ -800,8 +800,8 @@ func TestListDeadPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
page uint page int
size uint size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
@ -2056,7 +2056,7 @@ func TestListProcesses(t *testing.T) {
ps1 := &base.ProcessInfo{ ps1 := &base.ProcessInfo{
Concurrency: 10, Concurrency: 10,
Queues: map[string]uint{"default": 1}, Queues: map[string]int{"default": 1},
Host: "do.droplet1", Host: "do.droplet1",
PID: 1234, PID: 1234,
State: "running", State: "running",
@ -2066,7 +2066,7 @@ func TestListProcesses(t *testing.T) {
ps2 := &base.ProcessInfo{ ps2 := &base.ProcessInfo{
Concurrency: 20, Concurrency: 20,
Queues: map[string]uint{"email": 1}, Queues: map[string]int{"email": 1},
Host: "do.droplet2", Host: "do.droplet2",
PID: 9876, PID: 9876,
State: "stopped", State: "stopped",

View File

@ -745,7 +745,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) {
r := setup(t) r := setup(t)
pinfo := &base.ProcessInfo{ pinfo := &base.ProcessInfo{
Concurrency: 10, Concurrency: 10,
Queues: map[string]uint{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
PID: 98765, PID: 98765,
Host: "localhost", Host: "localhost",
State: "running", State: "running",

View File

@ -24,7 +24,7 @@ type processor struct {
handler Handler handler Handler
queueConfig map[string]uint queueConfig map[string]int
// orderedQueues is set only in strict-priority mode. // orderedQueues is set only in strict-priority mode.
orderedQueues []string orderedQueues []string
@ -324,7 +324,7 @@ func uniq(names []string, l int) []string {
// sortByPriority returns a list of queue names sorted by // sortByPriority returns a list of queue names sorted by
// their priority level in descending order. // their priority level in descending order.
func sortByPriority(qcfg map[string]uint) []string { func sortByPriority(qcfg map[string]int) []string {
var queues []*queue var queues []*queue
for qname, n := range qcfg { for qname, n := range qcfg {
queues = append(queues, &queue{qname, n}) queues = append(queues, &queue{qname, n})
@ -339,7 +339,7 @@ func sortByPriority(qcfg map[string]uint) []string {
type queue struct { type queue struct {
name string name string
priority uint priority int
} }
type byPriority []*queue type byPriority []*queue
@ -350,21 +350,21 @@ func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// normalizeQueueCfg divides priority numbers by their // normalizeQueueCfg divides priority numbers by their
// greatest common divisor. // greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { func normalizeQueueCfg(queueCfg map[string]int) map[string]int {
var xs []uint var xs []int
for _, x := range queueCfg { for _, x := range queueCfg {
xs = append(xs, x) xs = append(xs, x)
} }
d := gcd(xs...) d := gcd(xs...)
res := make(map[string]uint) res := make(map[string]int)
for q, x := range queueCfg { for q, x := range queueCfg {
res[q] = x / d res[q] = x / d
} }
return res return res
} }
func gcd(xs ...uint) uint { func gcd(xs ...int) int {
fn := func(x, y uint) uint { fn := func(x, y int) int {
for y > 0 { for y > 0 {
x, y = y, x%y x, y = y, x%y
} }

View File

@ -192,11 +192,11 @@ func TestProcessorQueues(t *testing.T) {
}) })
tests := []struct { tests := []struct {
queueCfg map[string]uint queueCfg map[string]int
want []string want []string
}{ }{
{ {
queueCfg: map[string]uint{ queueCfg: map[string]int{
"high": 6, "high": 6,
"default": 3, "default": 3,
"low": 1, "low": 1,
@ -204,7 +204,7 @@ func TestProcessorQueues(t *testing.T) {
want: []string{"high", "default", "low"}, want: []string{"high", "default", "low"},
}, },
{ {
queueCfg: map[string]uint{ queueCfg: map[string]int{
"default": 1, "default": 1,
}, },
want: []string{"default"}, want: []string{"default"},
@ -274,7 +274,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
processed = append(processed, task) processed = append(processed, task)
return nil return nil
} }
queueCfg := map[string]uint{ queueCfg := map[string]int{
"critical": 3, "critical": 3,
base.DefaultQueueName: 2, base.DefaultQueueName: 2,
"low": 1, "low": 1,

View File

@ -23,7 +23,7 @@ type scheduler struct {
qnames []string qnames []string
} }
func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler { func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string var qnames []string
for q := range qcfg { for q := range qcfg {
qnames = append(qnames, q) qnames = append(qnames, q)

View File

@ -44,16 +44,24 @@ asynqmon ls enqueued:critical -> List tasks from critical queue
} }
// Flags // Flags
var pageSize uint var pageSize int
var pageNum uint var pageNum int
func init() { func init() {
rootCmd.AddCommand(lsCmd) rootCmd.AddCommand(lsCmd)
lsCmd.Flags().UintVar(&pageSize, "size", 30, "page size") lsCmd.Flags().IntVar(&pageSize, "size", 30, "page size")
lsCmd.Flags().UintVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") lsCmd.Flags().IntVar(&pageNum, "page", 0, "page number - zero indexed (default 0)")
} }
func ls(cmd *cobra.Command, args []string) { func ls(cmd *cobra.Command, args []string) {
if pageSize < 0 {
fmt.Println("page size cannot be negative.")
os.Exit(1)
}
if pageNum < 0 {
fmt.Println("page number cannot be negative.")
os.Exit(1)
}
c := redis.NewClient(&redis.Options{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),

View File

@ -87,11 +87,11 @@ func timeAgo(since time.Time) string {
return fmt.Sprintf("%v ago", d) return fmt.Sprintf("%v ago", d)
} }
func formatQueues(qmap map[string]uint) string { func formatQueues(qmap map[string]int) string {
// sort queues by priority and name // sort queues by priority and name
type queue struct { type queue struct {
name string name string
priority uint priority int
} }
var queues []*queue var queues []*queue
for qname, p := range qmap { for qname, p := range qmap {