mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Update base package to generate redis keys with hashtag
This commit is contained in:
		| @@ -23,49 +23,71 @@ const Version = "0.10.0" | ||||
| // DefaultQueueName is the queue name used if none are specified by user. | ||||
| const DefaultQueueName = "default" | ||||
|  | ||||
| // Redis keys | ||||
| // DefaultQueue is the redis key for the default queue. | ||||
| var DefaultQueue = QueueKey(DefaultQueueName) | ||||
|  | ||||
| // Global Redis keys. | ||||
| const ( | ||||
| 	AllServers      = "asynq:servers"                // ZSET | ||||
| 	serversPrefix   = "asynq:servers:"               // STRING - asynq:ps:<host>:<pid>:<serverid> | ||||
| 	AllWorkers      = "asynq:workers"                // ZSET | ||||
| 	workersPrefix   = "asynq:workers:"               // HASH   - asynq:workers:<host:<pid>:<serverid> | ||||
| 	processedPrefix = "asynq:processed:"             // STRING - asynq:processed:<yyyy-mm-dd> | ||||
| 	failurePrefix   = "asynq:failure:"               // STRING - asynq:failure:<yyyy-mm-dd> | ||||
| 	QueuePrefix     = "asynq:queues:"                // LIST   - asynq:queues:<qname> | ||||
| 	AllQueues       = "asynq:queues"                 // SET | ||||
| 	DefaultQueue    = QueuePrefix + DefaultQueueName // LIST | ||||
| 	ScheduledQueue  = "asynq:scheduled"              // ZSET | ||||
| 	RetryQueue      = "asynq:retry"                  // ZSET | ||||
| 	DeadQueue       = "asynq:dead"                   // ZSET | ||||
| 	InProgressQueue = "asynq:in_progress"            // LIST | ||||
| 	KeyDeadlines    = "asynq:deadlines"              // ZSET | ||||
| 	PausedQueues    = "asynq:paused"                 // SET | ||||
| 	CancelChannel   = "asynq:cancel"                 // PubSub channel | ||||
| 	AllServers    = "asynq:servers" // ZSET | ||||
| 	AllWorkers    = "asynq:workers" // ZSET | ||||
| 	AllQueues     = "asynq:queues"  // SET | ||||
| 	CancelChannel = "asynq:cancel"  // PubSub channel | ||||
| ) | ||||
|  | ||||
| // QueueKey returns a redis key for the given queue name. | ||||
| func QueueKey(qname string) string { | ||||
| 	return QueuePrefix + strings.ToLower(qname) | ||||
| 	return fmt.Sprintf("asynq:{%s}", qname) | ||||
| } | ||||
|  | ||||
| // ProcessedKey returns a redis key for processed count for the given day. | ||||
| func ProcessedKey(t time.Time) string { | ||||
| 	return processedPrefix + t.UTC().Format("2006-01-02") | ||||
| // TODO: Should we rename this to "active"? | ||||
| // InProgressKey returns a redis key for the in-progress tasks. | ||||
| func InProgressKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:in_progress", qname) | ||||
| } | ||||
|  | ||||
| // FailureKey returns a redis key for failure count for the given day. | ||||
| func FailureKey(t time.Time) string { | ||||
| 	return failurePrefix + t.UTC().Format("2006-01-02") | ||||
| // ScheduledKey returns a redis key for the scheduled tasks. | ||||
| func ScheduledKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:scheduled", qname) | ||||
| } | ||||
|  | ||||
| // RetryKey returns a redis key for the retry tasks. | ||||
| func RetryKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:retry", qname) | ||||
| } | ||||
|  | ||||
| // DeadKey returns a redis key for the dead tasks. | ||||
| func DeadKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:dead", qname) | ||||
| } | ||||
|  | ||||
| // DeadlinesKey returns a redis key for the deadlines. | ||||
| func DeadlinesKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:deadlines", qname) | ||||
| } | ||||
|  | ||||
| // PausedKey returns a redis key to indicate that the given queue is paused. | ||||
| func PausedKey(qname string) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:paused", qname) | ||||
| } | ||||
|  | ||||
| // ProcessedKey returns a redis key for processed count for the given day for the queue. | ||||
| func ProcessedKey(qname string, t time.Time) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02")) | ||||
| } | ||||
|  | ||||
| // FailedKey returns a redis key for failure count for the given day for the queue. | ||||
| func FailedKey(qname string, t time.Time) string { | ||||
| 	return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02")) | ||||
| } | ||||
|  | ||||
| // ServerInfoKey returns a redis key for process info. | ||||
| func ServerInfoKey(hostname string, pid int, sid string) string { | ||||
| 	return fmt.Sprintf("%s%s:%d:%s", serversPrefix, hostname, pid, sid) | ||||
| 	return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, sid) | ||||
| } | ||||
|  | ||||
| // WorkersKey returns a redis key for the workers given hostname, pid, and server ID. | ||||
| func WorkersKey(hostname string, pid int, sid string) string { | ||||
| 	return fmt.Sprintf("%s%s:%d:%s", workersPrefix, hostname, pid, sid) | ||||
| 	return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) | ||||
| } | ||||
|  | ||||
| // TaskMessage is the internal representation of a task with additional metadata fields. | ||||
|   | ||||
| @@ -20,7 +20,8 @@ func TestQueueKey(t *testing.T) { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"custom", "asynq:queues:custom"}, | ||||
| 		{"default", "asynq:{default}"}, | ||||
| 		{"custom", "asynq:{custom}"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| @@ -31,36 +32,140 @@ func TestQueueKey(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestProcessedKey(t *testing.T) { | ||||
| func TestInProgressKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		input time.Time | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:processed:2019-11-14"}, | ||||
| 		{time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:processed:2020-12-01"}, | ||||
| 		{time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:processed:2020-01-06"}, | ||||
| 		{"default", "asynq:{default}:in_progress"}, | ||||
| 		{"custom", "asynq:{custom}:in_progress"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := ProcessedKey(tc.input) | ||||
| 		got := InProgressKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("InProgressKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeadlinesKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", "asynq:{default}:deadlines"}, | ||||
| 		{"custom", "asynq:{custom}:deadlines"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := DeadlinesKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("DeadlinesKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestScheduledKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", "asynq:{default}:scheduled"}, | ||||
| 		{"custom", "asynq:{custom}:scheduled"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := ScheduledKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("ScheduledKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestRetryKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", "asynq:{default}:retry"}, | ||||
| 		{"custom", "asynq:{custom}:retry"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := RetryKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("RetryKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestDeadKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", "asynq:{default}:dead"}, | ||||
| 		{"custom", "asynq:{custom}:dead"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := DeadKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("DeadKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestPausedKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", "asynq:{default}:paused"}, | ||||
| 		{"custom", "asynq:{custom}:paused"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := PausedKey(tc.qname) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("PausedKey(%q) = %q, want %q", tc.qname, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestProcessedKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		input time.Time | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:processed:2019-11-14"}, | ||||
| 		{"critical", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{critical}:processed:2020-12-01"}, | ||||
| 		{"default", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{default}:processed:2020-01-06"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := ProcessedKey(tc.qname, tc.input) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("ProcessedKey(%v) = %q, want %q", tc.input, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestFailureKey(t *testing.T) { | ||||
| func TestFailedKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		qname string | ||||
| 		input time.Time | ||||
| 		want  string | ||||
| 	}{ | ||||
| 		{time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:failure:2019-11-14"}, | ||||
| 		{time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:failure:2020-12-01"}, | ||||
| 		{time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:failure:2020-01-06"}, | ||||
| 		{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:failed:2019-11-14"}, | ||||
| 		{"custom", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{custom}:failed:2020-12-01"}, | ||||
| 		{"low", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{low}:failed:2020-01-06"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := FailureKey(tc.input) | ||||
| 		got := FailedKey(tc.qname, tc.input) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("FailureKey(%v) = %q, want %q", tc.input, got, tc.want) | ||||
| 		} | ||||
| @@ -74,8 +179,8 @@ func TestServerInfoKey(t *testing.T) { | ||||
| 		sid      string | ||||
| 		want     string | ||||
| 	}{ | ||||
| 		{"localhost", 9876, "server123", "asynq:servers:localhost:9876:server123"}, | ||||
| 		{"127.0.0.1", 1234, "server987", "asynq:servers:127.0.0.1:1234:server987"}, | ||||
| 		{"localhost", 9876, "server123", "asynq:servers:{localhost:9876:server123}"}, | ||||
| 		{"127.0.0.1", 1234, "server987", "asynq:servers:{127.0.0.1:1234:server987}"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| @@ -94,8 +199,8 @@ func TestWorkersKey(t *testing.T) { | ||||
| 		sid      string | ||||
| 		want     string | ||||
| 	}{ | ||||
| 		{"localhost", 9876, "server1", "asynq:workers:localhost:9876:server1"}, | ||||
| 		{"127.0.0.1", 1234, "server2", "asynq:workers:127.0.0.1:1234:server2"}, | ||||
| 		{"localhost", 9876, "server1", "asynq:workers:{localhost:9876:server1}"}, | ||||
| 		{"127.0.0.1", 1234, "server2", "asynq:workers:{127.0.0.1:1234:server2}"}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user