diff --git a/internal/base/base.go b/internal/base/base.go index 4b216ab..3f92f87 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -23,49 +23,71 @@ const Version = "0.10.0" // DefaultQueueName is the queue name used if none are specified by user. const DefaultQueueName = "default" -// Redis keys +// DefaultQueue is the redis key for the default queue. +var DefaultQueue = QueueKey(DefaultQueueName) + +// Global Redis keys. const ( - AllServers = "asynq:servers" // ZSET - serversPrefix = "asynq:servers:" // STRING - asynq:ps::: - AllWorkers = "asynq:workers" // ZSET - workersPrefix = "asynq:workers:" // HASH - asynq:workers:: - processedPrefix = "asynq:processed:" // STRING - asynq:processed: - failurePrefix = "asynq:failure:" // STRING - asynq:failure: - QueuePrefix = "asynq:queues:" // LIST - asynq:queues: - AllQueues = "asynq:queues" // SET - DefaultQueue = QueuePrefix + DefaultQueueName // LIST - ScheduledQueue = "asynq:scheduled" // ZSET - RetryQueue = "asynq:retry" // ZSET - DeadQueue = "asynq:dead" // ZSET - InProgressQueue = "asynq:in_progress" // LIST - KeyDeadlines = "asynq:deadlines" // ZSET - PausedQueues = "asynq:paused" // SET - CancelChannel = "asynq:cancel" // PubSub channel + AllServers = "asynq:servers" // ZSET + AllWorkers = "asynq:workers" // ZSET + AllQueues = "asynq:queues" // SET + CancelChannel = "asynq:cancel" // PubSub channel ) // QueueKey returns a redis key for the given queue name. func QueueKey(qname string) string { - return QueuePrefix + strings.ToLower(qname) + return fmt.Sprintf("asynq:{%s}", qname) } -// ProcessedKey returns a redis key for processed count for the given day. -func ProcessedKey(t time.Time) string { - return processedPrefix + t.UTC().Format("2006-01-02") +// TODO: Should we rename this to "active"? +// InProgressKey returns a redis key for the in-progress tasks. +func InProgressKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:in_progress", qname) } -// FailureKey returns a redis key for failure count for the given day. -func FailureKey(t time.Time) string { - return failurePrefix + t.UTC().Format("2006-01-02") +// ScheduledKey returns a redis key for the scheduled tasks. +func ScheduledKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:scheduled", qname) +} + +// RetryKey returns a redis key for the retry tasks. +func RetryKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:retry", qname) +} + +// DeadKey returns a redis key for the dead tasks. +func DeadKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:dead", qname) +} + +// DeadlinesKey returns a redis key for the deadlines. +func DeadlinesKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:deadlines", qname) +} + +// PausedKey returns a redis key to indicate that the given queue is paused. +func PausedKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:paused", qname) +} + +// ProcessedKey returns a redis key for processed count for the given day for the queue. +func ProcessedKey(qname string, t time.Time) string { + return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02")) +} + +// FailedKey returns a redis key for failure count for the given day for the queue. +func FailedKey(qname string, t time.Time) string { + return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02")) } // ServerInfoKey returns a redis key for process info. func ServerInfoKey(hostname string, pid int, sid string) string { - return fmt.Sprintf("%s%s:%d:%s", serversPrefix, hostname, pid, sid) + return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, sid) } // WorkersKey returns a redis key for the workers given hostname, pid, and server ID. func WorkersKey(hostname string, pid int, sid string) string { - return fmt.Sprintf("%s%s:%d:%s", workersPrefix, hostname, pid, sid) + return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) } // TaskMessage is the internal representation of a task with additional metadata fields. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 40f5c7a..395dd1e 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -20,7 +20,8 @@ func TestQueueKey(t *testing.T) { qname string want string }{ - {"custom", "asynq:queues:custom"}, + {"default", "asynq:{default}"}, + {"custom", "asynq:{custom}"}, } for _, tc := range tests { @@ -31,36 +32,140 @@ func TestQueueKey(t *testing.T) { } } -func TestProcessedKey(t *testing.T) { +func TestInProgressKey(t *testing.T) { tests := []struct { - input time.Time + qname string want string }{ - {time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:processed:2019-11-14"}, - {time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:processed:2020-12-01"}, - {time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:processed:2020-01-06"}, + {"default", "asynq:{default}:in_progress"}, + {"custom", "asynq:{custom}:in_progress"}, } for _, tc := range tests { - got := ProcessedKey(tc.input) + got := InProgressKey(tc.qname) + if got != tc.want { + t.Errorf("InProgressKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestDeadlinesKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:deadlines"}, + {"custom", "asynq:{custom}:deadlines"}, + } + + for _, tc := range tests { + got := DeadlinesKey(tc.qname) + if got != tc.want { + t.Errorf("DeadlinesKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestScheduledKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:scheduled"}, + {"custom", "asynq:{custom}:scheduled"}, + } + + for _, tc := range tests { + got := ScheduledKey(tc.qname) + if got != tc.want { + t.Errorf("ScheduledKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestRetryKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:retry"}, + {"custom", "asynq:{custom}:retry"}, + } + + for _, tc := range tests { + got := RetryKey(tc.qname) + if got != tc.want { + t.Errorf("RetryKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestDeadKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:dead"}, + {"custom", "asynq:{custom}:dead"}, + } + + for _, tc := range tests { + got := DeadKey(tc.qname) + if got != tc.want { + t.Errorf("DeadKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestPausedKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"default", "asynq:{default}:paused"}, + {"custom", "asynq:{custom}:paused"}, + } + + for _, tc := range tests { + got := PausedKey(tc.qname) + if got != tc.want { + t.Errorf("PausedKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestProcessedKey(t *testing.T) { + tests := []struct { + qname string + input time.Time + want string + }{ + {"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:processed:2019-11-14"}, + {"critical", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{critical}:processed:2020-12-01"}, + {"default", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{default}:processed:2020-01-06"}, + } + + for _, tc := range tests { + got := ProcessedKey(tc.qname, tc.input) if got != tc.want { t.Errorf("ProcessedKey(%v) = %q, want %q", tc.input, got, tc.want) } } } -func TestFailureKey(t *testing.T) { +func TestFailedKey(t *testing.T) { tests := []struct { + qname string input time.Time want string }{ - {time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:failure:2019-11-14"}, - {time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:failure:2020-12-01"}, - {time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:failure:2020-01-06"}, + {"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:failed:2019-11-14"}, + {"custom", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{custom}:failed:2020-12-01"}, + {"low", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{low}:failed:2020-01-06"}, } for _, tc := range tests { - got := FailureKey(tc.input) + got := FailedKey(tc.qname, tc.input) if got != tc.want { t.Errorf("FailureKey(%v) = %q, want %q", tc.input, got, tc.want) } @@ -74,8 +179,8 @@ func TestServerInfoKey(t *testing.T) { sid string want string }{ - {"localhost", 9876, "server123", "asynq:servers:localhost:9876:server123"}, - {"127.0.0.1", 1234, "server987", "asynq:servers:127.0.0.1:1234:server987"}, + {"localhost", 9876, "server123", "asynq:servers:{localhost:9876:server123}"}, + {"127.0.0.1", 1234, "server987", "asynq:servers:{127.0.0.1:1234:server987}"}, } for _, tc := range tests { @@ -94,8 +199,8 @@ func TestWorkersKey(t *testing.T) { sid string want string }{ - {"localhost", 9876, "server1", "asynq:workers:localhost:9876:server1"}, - {"127.0.0.1", 1234, "server2", "asynq:workers:127.0.0.1:1234:server2"}, + {"localhost", 9876, "server1", "asynq:workers:{localhost:9876:server1}"}, + {"127.0.0.1", 1234, "server2", "asynq:workers:{127.0.0.1:1234:server2}"}, } for _, tc := range tests {