diff --git a/README.md b/README.md index 28dbc85..ec9791c 100644 --- a/README.md +++ b/README.md @@ -72,9 +72,8 @@ func main() { bg := asynq.NewBackground(r, &asynq.Config{ // Specify how many concurrent workers to use Concurrency: 10, - // You can optionally create multiple queues - // with different priority level - Queues: map[string]uint{ + // You can optionally create multiple queues with different priority. + Queues: map[string]int{ "critical": 6, "default": 3, "low": 1, diff --git a/background.go b/background.go index e878cbc..fa533a2 100644 --- a/background.go +++ b/background.go @@ -47,7 +47,7 @@ type Background struct { type Config struct { // Maximum number of concurrent processing of tasks. // - // If set to zero or negative value, NewBackground will overwrite the value to one. + // If set to a zero or negative value, NewBackground will overwrite the value to one. Concurrency int // Function to calculate retry delay for a failed task. @@ -59,15 +59,15 @@ type Config struct { // t is the task in question. RetryDelayFunc func(n int, e error, t *Task) time.Duration - // List of queues to process with given priority level. Keys are the names of the - // queues and values are associated priority level. + // List of queues to process with given priority value. Keys are the names of the + // queues and values are associated priority value. // // If set to nil or not specified, the background will process only the "default" queue. // // Priority is treated as follows to avoid starving low priority queues. // // Example: - // Queues: map[string]uint{ + // Queues: map[string]int{ // "critical": 6, // "default": 3, // "low": 1, @@ -75,7 +75,9 @@ type Config struct { // With the above config and given that all queues are not empty, the tasks // in "critical", "default", "low" should be processed 60%, 30%, 10% of // the time respectively. - Queues map[string]uint + // + // If a queue has a zero or negative priority value, the queue will be ignored. + Queues map[string]int // StrictPriority indicates whether the queue priority should be treated strictly. // @@ -92,7 +94,7 @@ func defaultDelayFunc(n int, e error, t *Task) time.Duration { return time.Duration(s) * time.Second } -var defaultQueueConfig = map[string]uint{ +var defaultQueueConfig = map[string]int{ base.DefaultQueueName: 1, } @@ -107,8 +109,13 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { if delayFunc == nil { delayFunc = defaultDelayFunc } - queues := cfg.Queues - if queues == nil || len(queues) == 0 { + queues := make(map[string]int) + for qname, p := range cfg.Queues { + if p > 0 { + queues[qname] = p + } + } + if len(queues) == 0 { queues = defaultQueueConfig } diff --git a/background_test.go b/background_test.go index 0b6f5ec..32aaeb2 100644 --- a/background_test.go +++ b/background_test.go @@ -43,16 +43,16 @@ func TestBackground(t *testing.T) { func TestGCD(t *testing.T) { tests := []struct { - input []uint - want uint + input []int + want int }{ - {[]uint{6, 2, 12}, 2}, - {[]uint{3, 3, 3}, 3}, - {[]uint{6, 3, 1}, 1}, - {[]uint{1}, 1}, - {[]uint{1, 0, 2}, 1}, - {[]uint{8, 0, 4}, 4}, - {[]uint{9, 12, 18, 30}, 3}, + {[]int{6, 2, 12}, 2}, + {[]int{3, 3, 3}, 3}, + {[]int{6, 3, 1}, 1}, + {[]int{1}, 1}, + {[]int{1, 0, 2}, 1}, + {[]int{8, 0, 4}, 4}, + {[]int{9, 12, 18, 30}, 3}, } for _, tc := range tests { @@ -65,46 +65,46 @@ func TestGCD(t *testing.T) { func TestNormalizeQueueCfg(t *testing.T) { tests := []struct { - input map[string]uint - want map[string]uint + input map[string]int + want map[string]int }{ { - input: map[string]uint{ + input: map[string]int{ "high": 100, "default": 20, "low": 5, }, - want: map[string]uint{ + want: map[string]int{ "high": 20, "default": 4, "low": 1, }, }, { - input: map[string]uint{ + input: map[string]int{ "default": 10, }, - want: map[string]uint{ + want: map[string]int{ "default": 1, }, }, { - input: map[string]uint{ + input: map[string]int{ "critical": 5, "default": 1, }, - want: map[string]uint{ + want: map[string]int{ "critical": 5, "default": 1, }, }, { - input: map[string]uint{ + input: map[string]int{ "critical": 6, "default": 3, "low": 0, }, - want: map[string]uint{ + want: map[string]int{ "critical": 2, "default": 1, "low": 0, diff --git a/benchmark_test.go b/benchmark_test.go index eb5fa22..5988adb 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -120,7 +120,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { client := NewClient(redis) bg := NewBackground(redis, &Config{ Concurrency: 10, - Queues: map[string]uint{ + Queues: map[string]int{ "high": 6, "default": 3, "low": 1, diff --git a/heartbeat_test.go b/heartbeat_test.go index d4fffb2..268ea76 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -23,10 +23,10 @@ func TestHeartbeater(t *testing.T) { interval time.Duration host string pid int - queues map[string]uint + queues map[string]int concurrency int }{ - {time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10}, + {time.Second, "some.address.ec2.aws.com", 45678, map[string]int{"default": 1}, 10}, } timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) diff --git a/internal/base/base.go b/internal/base/base.go index b7896fa..12f01df 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -91,7 +91,7 @@ type TaskMessage struct { type ProcessInfo struct { mu sync.Mutex Concurrency int - Queues map[string]uint + Queues map[string]int StrictPriority bool PID int Host string @@ -101,7 +101,7 @@ type ProcessInfo struct { } // NewProcessInfo returns a new instance of ProcessInfo. -func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo { +func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo { return &ProcessInfo{ Host: host, PID: pid, diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 00be893..46050c0 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -82,7 +82,7 @@ func TestProcessInfoKey(t *testing.T) { // Note: Run this test with -race flag to check for data race. func TestProcessInfoSetter(t *testing.T) { - pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false) + pi := NewProcessInfo("localhost", 1234, 8, map[string]int{"default": 1}, false) var wg sync.WaitGroup diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 578b636..d0f4be7 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -245,10 +245,10 @@ func reverse(x []string) { // for the list operation. type Pagination struct { // Number of items in the page. - Size uint + Size int // Page number starting from zero. - Page uint + Page int } func (p Pagination) start() int64 { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 91422c0..cd671c2 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -317,8 +317,8 @@ func TestListEnqueuedPagination(t *testing.T) { tests := []struct { desc string qname string - page uint - size uint + page int + size int wantSize int wantFirst string wantLast string @@ -418,8 +418,8 @@ func TestListInProgressPagination(t *testing.T) { tests := []struct { desc string - page uint - size uint + page int + size int wantSize int wantFirst string wantLast string @@ -524,8 +524,8 @@ func TestListScheduledPagination(t *testing.T) { tests := []struct { desc string - page uint - size uint + page int + size int wantSize int wantFirst string wantLast string @@ -667,8 +667,8 @@ func TestListRetryPagination(t *testing.T) { tests := []struct { desc string - page uint - size uint + page int + size int wantSize int wantFirst string wantLast string @@ -800,8 +800,8 @@ func TestListDeadPagination(t *testing.T) { tests := []struct { desc string - page uint - size uint + page int + size int wantSize int wantFirst string wantLast string @@ -2056,7 +2056,7 @@ func TestListProcesses(t *testing.T) { ps1 := &base.ProcessInfo{ Concurrency: 10, - Queues: map[string]uint{"default": 1}, + Queues: map[string]int{"default": 1}, Host: "do.droplet1", PID: 1234, State: "running", @@ -2066,7 +2066,7 @@ func TestListProcesses(t *testing.T) { ps2 := &base.ProcessInfo{ Concurrency: 20, - Queues: map[string]uint{"email": 1}, + Queues: map[string]int{"email": 1}, Host: "do.droplet2", PID: 9876, State: "stopped", diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 8b8d52e..accc52d 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -745,7 +745,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) { r := setup(t) pinfo := &base.ProcessInfo{ Concurrency: 10, - Queues: map[string]uint{"default": 2, "email": 5, "low": 1}, + Queues: map[string]int{"default": 2, "email": 5, "low": 1}, PID: 98765, Host: "localhost", State: "running", diff --git a/processor.go b/processor.go index b91c4ca..6bc4b1c 100644 --- a/processor.go +++ b/processor.go @@ -24,7 +24,7 @@ type processor struct { handler Handler - queueConfig map[string]uint + queueConfig map[string]int // orderedQueues is set only in strict-priority mode. orderedQueues []string @@ -324,7 +324,7 @@ func uniq(names []string, l int) []string { // sortByPriority returns a list of queue names sorted by // their priority level in descending order. -func sortByPriority(qcfg map[string]uint) []string { +func sortByPriority(qcfg map[string]int) []string { var queues []*queue for qname, n := range qcfg { queues = append(queues, &queue{qname, n}) @@ -339,7 +339,7 @@ func sortByPriority(qcfg map[string]uint) []string { type queue struct { name string - priority uint + priority int } type byPriority []*queue @@ -350,21 +350,21 @@ func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] } // normalizeQueueCfg divides priority numbers by their // greatest common divisor. -func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { - var xs []uint +func normalizeQueueCfg(queueCfg map[string]int) map[string]int { + var xs []int for _, x := range queueCfg { xs = append(xs, x) } d := gcd(xs...) - res := make(map[string]uint) + res := make(map[string]int) for q, x := range queueCfg { res[q] = x / d } return res } -func gcd(xs ...uint) uint { - fn := func(x, y uint) uint { +func gcd(xs ...int) int { + fn := func(x, y int) int { for y > 0 { x, y = y, x%y } diff --git a/processor_test.go b/processor_test.go index 1bc5d99..079bf6e 100644 --- a/processor_test.go +++ b/processor_test.go @@ -192,11 +192,11 @@ func TestProcessorQueues(t *testing.T) { }) tests := []struct { - queueCfg map[string]uint + queueCfg map[string]int want []string }{ { - queueCfg: map[string]uint{ + queueCfg: map[string]int{ "high": 6, "default": 3, "low": 1, @@ -204,7 +204,7 @@ func TestProcessorQueues(t *testing.T) { want: []string{"high", "default", "low"}, }, { - queueCfg: map[string]uint{ + queueCfg: map[string]int{ "default": 1, }, want: []string{"default"}, @@ -274,7 +274,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { processed = append(processed, task) return nil } - queueCfg := map[string]uint{ + queueCfg := map[string]int{ "critical": 3, base.DefaultQueueName: 2, "low": 1, diff --git a/scheduler.go b/scheduler.go index 8c59ebe..7276f4c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -23,7 +23,7 @@ type scheduler struct { qnames []string } -func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler { +func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index a4b85ee..3bec832 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -44,16 +44,24 @@ asynqmon ls enqueued:critical -> List tasks from critical queue } // Flags -var pageSize uint -var pageNum uint +var pageSize int +var pageNum int func init() { rootCmd.AddCommand(lsCmd) - lsCmd.Flags().UintVar(&pageSize, "size", 30, "page size") - lsCmd.Flags().UintVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") + lsCmd.Flags().IntVar(&pageSize, "size", 30, "page size") + lsCmd.Flags().IntVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") } func ls(cmd *cobra.Command, args []string) { + if pageSize < 0 { + fmt.Println("page size cannot be negative.") + os.Exit(1) + } + if pageNum < 0 { + fmt.Println("page number cannot be negative.") + os.Exit(1) + } c := redis.NewClient(&redis.Options{ Addr: viper.GetString("uri"), DB: viper.GetInt("db"), diff --git a/tools/asynqmon/cmd/ps.go b/tools/asynqmon/cmd/ps.go index ef25876..7761886 100644 --- a/tools/asynqmon/cmd/ps.go +++ b/tools/asynqmon/cmd/ps.go @@ -87,11 +87,11 @@ func timeAgo(since time.Time) string { return fmt.Sprintf("%v ago", d) } -func formatQueues(qmap map[string]uint) string { +func formatQueues(qmap map[string]int) string { // sort queues by priority and name type queue struct { name string - priority uint + priority int } var queues []*queue for qname, p := range qmap {