diff --git a/internal/base/base.go b/internal/base/base.go index 8c04c84..7070c18 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -20,8 +20,9 @@ const DefaultQueueName = "default" // Redis keys const ( - psPrefix = "asynq:ps:" // HASH AllProcesses = "asynq:ps" // ZSET + psPrefix = "asynq:ps:" // STRING - asynq:ps:: + workersPrefix = "asynq:workers:" // HASH - asynq:workers: processedPrefix = "asynq:processed:" // STRING - asynq:processed: failurePrefix = "asynq:failure:" // STRING - asynq:failure: QueuePrefix = "asynq:queues:" // LIST - asynq:queues: @@ -34,28 +35,31 @@ const ( CancelChannel = "asynq:cancel" // PubSub channel ) -// QueueKey returns a redis key string for the given queue name. +// QueueKey returns a redis key for the given queue name. func QueueKey(qname string) string { return QueuePrefix + strings.ToLower(qname) } -// ProcessedKey returns a redis key string for processed count -// for the given day. +// ProcessedKey returns a redis key for processed count for the given day. func ProcessedKey(t time.Time) string { return processedPrefix + t.UTC().Format("2006-01-02") } -// FailureKey returns a redis key string for failure count -// for the given day. +// FailureKey returns a redis key for failure count for the given day. func FailureKey(t time.Time) string { return failurePrefix + t.UTC().Format("2006-01-02") } -// ProcessInfoKey returns a redis key string for process info. +// ProcessInfoKey returns a redis key for process info. func ProcessInfoKey(hostname string, pid int) string { return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid) } +// WorkersKey returns a redis key for the workers given hostname and pid. +func WorkersKey(hostname string, pid int) string { + return fmt.Sprintf("%s%s:%d", workersPrefix, hostname, pid) +} + // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written to redis. type TaskMessage struct { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 225dcc6..b6156b0 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -74,7 +74,25 @@ func TestProcessInfoKey(t *testing.T) { for _, tc := range tests { got := ProcessInfoKey(tc.hostname, tc.pid) if got != tc.want { - t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) + t.Errorf("ProcessInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) + } + } +} + +func TestWorkersKey(t *testing.T) { + tests := []struct { + hostname string + pid int + want string + }{ + {"localhost", 9876, "asynq:workers:localhost:9876"}, + {"127.0.0.1", 1234, "asynq:workers:127.0.0.1:1234"}, + } + + for _, tc := range tests { + got := WorkersKey(tc.hostname, tc.pid) + if got != tc.want { + t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want) } } }