mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 10:56:12 +08:00 
			
		
		
		
	Use int as priority value type.
Do not use unsigned int for merely non-negative quantities as it complicates simple arithmetic.
This commit is contained in:
		| @@ -72,9 +72,8 @@ func main() { | ||||
|     bg := asynq.NewBackground(r, &asynq.Config{ | ||||
|         // Specify how many concurrent workers to use | ||||
|         Concurrency: 10, | ||||
|         // You can optionally create multiple queues | ||||
|         // with different priority level | ||||
|         Queues: map[string]uint{ | ||||
|         // You can optionally create multiple queues with different priority. | ||||
|         Queues: map[string]int{ | ||||
|             "critical": 6, | ||||
|             "default":  3, | ||||
|             "low":      1, | ||||
|   | ||||
| @@ -47,7 +47,7 @@ type Background struct { | ||||
| type Config struct { | ||||
| 	// Maximum number of concurrent processing of tasks. | ||||
| 	// | ||||
| 	// If set to zero or negative value, NewBackground will overwrite the value to one. | ||||
| 	// If set to a zero or negative value, NewBackground will overwrite the value to one. | ||||
| 	Concurrency int | ||||
|  | ||||
| 	// Function to calculate retry delay for a failed task. | ||||
| @@ -59,15 +59,15 @@ type Config struct { | ||||
| 	// t is the task in question. | ||||
| 	RetryDelayFunc func(n int, e error, t *Task) time.Duration | ||||
|  | ||||
| 	// List of queues to process with given priority level. Keys are the names of the | ||||
| 	// queues and values are associated priority level. | ||||
| 	// List of queues to process with given priority value. Keys are the names of the | ||||
| 	// queues and values are associated priority value. | ||||
| 	// | ||||
| 	// If set to nil or not specified, the background will process only the "default" queue. | ||||
| 	// | ||||
| 	// Priority is treated as follows to avoid starving low priority queues. | ||||
| 	// | ||||
| 	// Example: | ||||
| 	// Queues: map[string]uint{ | ||||
| 	// Queues: map[string]int{ | ||||
| 	//     "critical": 6, | ||||
| 	//     "default":  3, | ||||
| 	//     "low":      1, | ||||
| @@ -75,7 +75,9 @@ type Config struct { | ||||
| 	// With the above config and given that all queues are not empty, the tasks | ||||
| 	// in "critical", "default", "low" should be processed 60%, 30%, 10% of | ||||
| 	// the time respectively. | ||||
| 	Queues map[string]uint | ||||
| 	// | ||||
| 	// If a queue has a zero or negative priority value, the queue will be ignored. | ||||
| 	Queues map[string]int | ||||
|  | ||||
| 	// StrictPriority indicates whether the queue priority should be treated strictly. | ||||
| 	// | ||||
| @@ -92,7 +94,7 @@ func defaultDelayFunc(n int, e error, t *Task) time.Duration { | ||||
| 	return time.Duration(s) * time.Second | ||||
| } | ||||
|  | ||||
| var defaultQueueConfig = map[string]uint{ | ||||
| var defaultQueueConfig = map[string]int{ | ||||
| 	base.DefaultQueueName: 1, | ||||
| } | ||||
|  | ||||
| @@ -107,8 +109,13 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | ||||
| 	if delayFunc == nil { | ||||
| 		delayFunc = defaultDelayFunc | ||||
| 	} | ||||
| 	queues := cfg.Queues | ||||
| 	if queues == nil || len(queues) == 0 { | ||||
| 	queues := make(map[string]int) | ||||
| 	for qname, p := range cfg.Queues { | ||||
| 		if p > 0 { | ||||
| 			queues[qname] = p | ||||
| 		} | ||||
| 	} | ||||
| 	if len(queues) == 0 { | ||||
| 		queues = defaultQueueConfig | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -43,16 +43,16 @@ func TestBackground(t *testing.T) { | ||||
|  | ||||
| func TestGCD(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		input []uint | ||||
| 		want  uint | ||||
| 		input []int | ||||
| 		want  int | ||||
| 	}{ | ||||
| 		{[]uint{6, 2, 12}, 2}, | ||||
| 		{[]uint{3, 3, 3}, 3}, | ||||
| 		{[]uint{6, 3, 1}, 1}, | ||||
| 		{[]uint{1}, 1}, | ||||
| 		{[]uint{1, 0, 2}, 1}, | ||||
| 		{[]uint{8, 0, 4}, 4}, | ||||
| 		{[]uint{9, 12, 18, 30}, 3}, | ||||
| 		{[]int{6, 2, 12}, 2}, | ||||
| 		{[]int{3, 3, 3}, 3}, | ||||
| 		{[]int{6, 3, 1}, 1}, | ||||
| 		{[]int{1}, 1}, | ||||
| 		{[]int{1, 0, 2}, 1}, | ||||
| 		{[]int{8, 0, 4}, 4}, | ||||
| 		{[]int{9, 12, 18, 30}, 3}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| @@ -65,46 +65,46 @@ func TestGCD(t *testing.T) { | ||||
|  | ||||
| func TestNormalizeQueueCfg(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		input map[string]uint | ||||
| 		want  map[string]uint | ||||
| 		input map[string]int | ||||
| 		want  map[string]int | ||||
| 	}{ | ||||
| 		{ | ||||
| 			input: map[string]uint{ | ||||
| 			input: map[string]int{ | ||||
| 				"high":    100, | ||||
| 				"default": 20, | ||||
| 				"low":     5, | ||||
| 			}, | ||||
| 			want: map[string]uint{ | ||||
| 			want: map[string]int{ | ||||
| 				"high":    20, | ||||
| 				"default": 4, | ||||
| 				"low":     1, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input: map[string]uint{ | ||||
| 			input: map[string]int{ | ||||
| 				"default": 10, | ||||
| 			}, | ||||
| 			want: map[string]uint{ | ||||
| 			want: map[string]int{ | ||||
| 				"default": 1, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input: map[string]uint{ | ||||
| 			input: map[string]int{ | ||||
| 				"critical": 5, | ||||
| 				"default":  1, | ||||
| 			}, | ||||
| 			want: map[string]uint{ | ||||
| 			want: map[string]int{ | ||||
| 				"critical": 5, | ||||
| 				"default":  1, | ||||
| 			}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			input: map[string]uint{ | ||||
| 			input: map[string]int{ | ||||
| 				"critical": 6, | ||||
| 				"default":  3, | ||||
| 				"low":      0, | ||||
| 			}, | ||||
| 			want: map[string]uint{ | ||||
| 			want: map[string]int{ | ||||
| 				"critical": 2, | ||||
| 				"default":  1, | ||||
| 				"low":      0, | ||||
|   | ||||
| @@ -120,7 +120,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { | ||||
| 		client := NewClient(redis) | ||||
| 		bg := NewBackground(redis, &Config{ | ||||
| 			Concurrency: 10, | ||||
| 			Queues: map[string]uint{ | ||||
| 			Queues: map[string]int{ | ||||
| 				"high":    6, | ||||
| 				"default": 3, | ||||
| 				"low":     1, | ||||
|   | ||||
| @@ -23,10 +23,10 @@ func TestHeartbeater(t *testing.T) { | ||||
| 		interval    time.Duration | ||||
| 		host        string | ||||
| 		pid         int | ||||
| 		queues      map[string]uint | ||||
| 		queues      map[string]int | ||||
| 		concurrency int | ||||
| 	}{ | ||||
| 		{time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10}, | ||||
| 		{time.Second, "some.address.ec2.aws.com", 45678, map[string]int{"default": 1}, 10}, | ||||
| 	} | ||||
|  | ||||
| 	timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) | ||||
|   | ||||
| @@ -91,7 +91,7 @@ type TaskMessage struct { | ||||
| type ProcessInfo struct { | ||||
| 	mu                sync.Mutex | ||||
| 	Concurrency       int | ||||
| 	Queues            map[string]uint | ||||
| 	Queues            map[string]int | ||||
| 	StrictPriority    bool | ||||
| 	PID               int | ||||
| 	Host              string | ||||
| @@ -101,7 +101,7 @@ type ProcessInfo struct { | ||||
| } | ||||
|  | ||||
| // NewProcessInfo returns a new instance of ProcessInfo. | ||||
| func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo { | ||||
| func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo { | ||||
| 	return &ProcessInfo{ | ||||
| 		Host:           host, | ||||
| 		PID:            pid, | ||||
|   | ||||
| @@ -82,7 +82,7 @@ func TestProcessInfoKey(t *testing.T) { | ||||
|  | ||||
| // Note: Run this test with -race flag to check for data race. | ||||
| func TestProcessInfoSetter(t *testing.T) { | ||||
| 	pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false) | ||||
| 	pi := NewProcessInfo("localhost", 1234, 8, map[string]int{"default": 1}, false) | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
|  | ||||
|   | ||||
| @@ -245,10 +245,10 @@ func reverse(x []string) { | ||||
| // for the list operation. | ||||
| type Pagination struct { | ||||
| 	// Number of items in the page. | ||||
| 	Size uint | ||||
| 	Size int | ||||
|  | ||||
| 	// Page number starting from zero. | ||||
| 	Page uint | ||||
| 	Page int | ||||
| } | ||||
|  | ||||
| func (p Pagination) start() int64 { | ||||
|   | ||||
| @@ -317,8 +317,8 @@ func TestListEnqueuedPagination(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		desc      string | ||||
| 		qname     string | ||||
| 		page      uint | ||||
| 		size      uint | ||||
| 		page      int | ||||
| 		size      int | ||||
| 		wantSize  int | ||||
| 		wantFirst string | ||||
| 		wantLast  string | ||||
| @@ -418,8 +418,8 @@ func TestListInProgressPagination(t *testing.T) { | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		desc      string | ||||
| 		page      uint | ||||
| 		size      uint | ||||
| 		page      int | ||||
| 		size      int | ||||
| 		wantSize  int | ||||
| 		wantFirst string | ||||
| 		wantLast  string | ||||
| @@ -524,8 +524,8 @@ func TestListScheduledPagination(t *testing.T) { | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		desc      string | ||||
| 		page      uint | ||||
| 		size      uint | ||||
| 		page      int | ||||
| 		size      int | ||||
| 		wantSize  int | ||||
| 		wantFirst string | ||||
| 		wantLast  string | ||||
| @@ -667,8 +667,8 @@ func TestListRetryPagination(t *testing.T) { | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		desc      string | ||||
| 		page      uint | ||||
| 		size      uint | ||||
| 		page      int | ||||
| 		size      int | ||||
| 		wantSize  int | ||||
| 		wantFirst string | ||||
| 		wantLast  string | ||||
| @@ -800,8 +800,8 @@ func TestListDeadPagination(t *testing.T) { | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		desc      string | ||||
| 		page      uint | ||||
| 		size      uint | ||||
| 		page      int | ||||
| 		size      int | ||||
| 		wantSize  int | ||||
| 		wantFirst string | ||||
| 		wantLast  string | ||||
| @@ -2056,7 +2056,7 @@ func TestListProcesses(t *testing.T) { | ||||
|  | ||||
| 	ps1 := &base.ProcessInfo{ | ||||
| 		Concurrency:       10, | ||||
| 		Queues:            map[string]uint{"default": 1}, | ||||
| 		Queues:            map[string]int{"default": 1}, | ||||
| 		Host:              "do.droplet1", | ||||
| 		PID:               1234, | ||||
| 		State:             "running", | ||||
| @@ -2066,7 +2066,7 @@ func TestListProcesses(t *testing.T) { | ||||
|  | ||||
| 	ps2 := &base.ProcessInfo{ | ||||
| 		Concurrency:       20, | ||||
| 		Queues:            map[string]uint{"email": 1}, | ||||
| 		Queues:            map[string]int{"email": 1}, | ||||
| 		Host:              "do.droplet2", | ||||
| 		PID:               9876, | ||||
| 		State:             "stopped", | ||||
|   | ||||
| @@ -745,7 +745,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	pinfo := &base.ProcessInfo{ | ||||
| 		Concurrency:       10, | ||||
| 		Queues:            map[string]uint{"default": 2, "email": 5, "low": 1}, | ||||
| 		Queues:            map[string]int{"default": 2, "email": 5, "low": 1}, | ||||
| 		PID:               98765, | ||||
| 		Host:              "localhost", | ||||
| 		State:             "running", | ||||
|   | ||||
							
								
								
									
										16
									
								
								processor.go
									
									
									
									
									
								
							
							
						
						
									
										16
									
								
								processor.go
									
									
									
									
									
								
							| @@ -24,7 +24,7 @@ type processor struct { | ||||
|  | ||||
| 	handler Handler | ||||
|  | ||||
| 	queueConfig map[string]uint | ||||
| 	queueConfig map[string]int | ||||
|  | ||||
| 	// orderedQueues is set only in strict-priority mode. | ||||
| 	orderedQueues []string | ||||
| @@ -324,7 +324,7 @@ func uniq(names []string, l int) []string { | ||||
|  | ||||
| // sortByPriority returns a list of queue names sorted by | ||||
| // their priority level in descending order. | ||||
| func sortByPriority(qcfg map[string]uint) []string { | ||||
| func sortByPriority(qcfg map[string]int) []string { | ||||
| 	var queues []*queue | ||||
| 	for qname, n := range qcfg { | ||||
| 		queues = append(queues, &queue{qname, n}) | ||||
| @@ -339,7 +339,7 @@ func sortByPriority(qcfg map[string]uint) []string { | ||||
|  | ||||
| type queue struct { | ||||
| 	name     string | ||||
| 	priority uint | ||||
| 	priority int | ||||
| } | ||||
|  | ||||
| type byPriority []*queue | ||||
| @@ -350,21 +350,21 @@ func (x byPriority) Swap(i, j int)      { x[i], x[j] = x[j], x[i] } | ||||
|  | ||||
| // normalizeQueueCfg divides priority numbers by their | ||||
| // greatest common divisor. | ||||
| func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { | ||||
| 	var xs []uint | ||||
| func normalizeQueueCfg(queueCfg map[string]int) map[string]int { | ||||
| 	var xs []int | ||||
| 	for _, x := range queueCfg { | ||||
| 		xs = append(xs, x) | ||||
| 	} | ||||
| 	d := gcd(xs...) | ||||
| 	res := make(map[string]uint) | ||||
| 	res := make(map[string]int) | ||||
| 	for q, x := range queueCfg { | ||||
| 		res[q] = x / d | ||||
| 	} | ||||
| 	return res | ||||
| } | ||||
|  | ||||
| func gcd(xs ...uint) uint { | ||||
| 	fn := func(x, y uint) uint { | ||||
| func gcd(xs ...int) int { | ||||
| 	fn := func(x, y int) int { | ||||
| 		for y > 0 { | ||||
| 			x, y = y, x%y | ||||
| 		} | ||||
|   | ||||
| @@ -192,11 +192,11 @@ func TestProcessorQueues(t *testing.T) { | ||||
| 	}) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		queueCfg map[string]uint | ||||
| 		queueCfg map[string]int | ||||
| 		want     []string | ||||
| 	}{ | ||||
| 		{ | ||||
| 			queueCfg: map[string]uint{ | ||||
| 			queueCfg: map[string]int{ | ||||
| 				"high":    6, | ||||
| 				"default": 3, | ||||
| 				"low":     1, | ||||
| @@ -204,7 +204,7 @@ func TestProcessorQueues(t *testing.T) { | ||||
| 			want: []string{"high", "default", "low"}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			queueCfg: map[string]uint{ | ||||
| 			queueCfg: map[string]int{ | ||||
| 				"default": 1, | ||||
| 			}, | ||||
| 			want: []string{"default"}, | ||||
| @@ -274,7 +274,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { | ||||
| 			processed = append(processed, task) | ||||
| 			return nil | ||||
| 		} | ||||
| 		queueCfg := map[string]uint{ | ||||
| 		queueCfg := map[string]int{ | ||||
| 			"critical":            3, | ||||
| 			base.DefaultQueueName: 2, | ||||
| 			"low":                 1, | ||||
|   | ||||
| @@ -23,7 +23,7 @@ type scheduler struct { | ||||
| 	qnames []string | ||||
| } | ||||
|  | ||||
| func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler { | ||||
| func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { | ||||
| 	var qnames []string | ||||
| 	for q := range qcfg { | ||||
| 		qnames = append(qnames, q) | ||||
|   | ||||
| @@ -44,16 +44,24 @@ asynqmon ls enqueued:critical -> List tasks from critical queue | ||||
| } | ||||
|  | ||||
| // Flags | ||||
| var pageSize uint | ||||
| var pageNum uint | ||||
| var pageSize int | ||||
| var pageNum int | ||||
|  | ||||
| func init() { | ||||
| 	rootCmd.AddCommand(lsCmd) | ||||
| 	lsCmd.Flags().UintVar(&pageSize, "size", 30, "page size") | ||||
| 	lsCmd.Flags().UintVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") | ||||
| 	lsCmd.Flags().IntVar(&pageSize, "size", 30, "page size") | ||||
| 	lsCmd.Flags().IntVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") | ||||
| } | ||||
|  | ||||
| func ls(cmd *cobra.Command, args []string) { | ||||
| 	if pageSize < 0 { | ||||
| 		fmt.Println("page size cannot be negative.") | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	if pageNum < 0 { | ||||
| 		fmt.Println("page number cannot be negative.") | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	c := redis.NewClient(&redis.Options{ | ||||
| 		Addr:     viper.GetString("uri"), | ||||
| 		DB:       viper.GetInt("db"), | ||||
|   | ||||
| @@ -87,11 +87,11 @@ func timeAgo(since time.Time) string { | ||||
| 	return fmt.Sprintf("%v ago", d) | ||||
| } | ||||
|  | ||||
| func formatQueues(qmap map[string]uint) string { | ||||
| func formatQueues(qmap map[string]int) string { | ||||
| 	// sort queues by priority and name | ||||
| 	type queue struct { | ||||
| 		name     string | ||||
| 		priority uint | ||||
| 		priority int | ||||
| 	} | ||||
| 	var queues []*queue | ||||
| 	for qname, p := range qmap { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user