2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 19:38:49 +08:00

Add workers key in base package

This commit is contained in:
Ken Hibino 2020-02-20 06:34:09 -08:00
parent 830020eb39
commit 9e02a91808
2 changed files with 30 additions and 8 deletions

View File

@ -20,8 +20,9 @@ const DefaultQueueName = "default"
// Redis keys // Redis keys
const ( const (
psPrefix = "asynq:ps:" // HASH
AllProcesses = "asynq:ps" // ZSET AllProcesses = "asynq:ps" // ZSET
psPrefix = "asynq:ps:" // STRING - asynq:ps:<host>:<pid>
workersPrefix = "asynq:workers:" // HASH - asynq:workers:<host:<pid>
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd> processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd> failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname> QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
@ -34,28 +35,31 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel CancelChannel = "asynq:cancel" // PubSub channel
) )
// QueueKey returns a redis key string for the given queue name. // QueueKey returns a redis key for the given queue name.
func QueueKey(qname string) string { func QueueKey(qname string) string {
return QueuePrefix + strings.ToLower(qname) return QueuePrefix + strings.ToLower(qname)
} }
// ProcessedKey returns a redis key string for processed count // ProcessedKey returns a redis key for processed count for the given day.
// for the given day.
func ProcessedKey(t time.Time) string { func ProcessedKey(t time.Time) string {
return processedPrefix + t.UTC().Format("2006-01-02") return processedPrefix + t.UTC().Format("2006-01-02")
} }
// FailureKey returns a redis key string for failure count // FailureKey returns a redis key for failure count for the given day.
// for the given day.
func FailureKey(t time.Time) string { func FailureKey(t time.Time) string {
return failurePrefix + t.UTC().Format("2006-01-02") return failurePrefix + t.UTC().Format("2006-01-02")
} }
// ProcessInfoKey returns a redis key string for process info. // ProcessInfoKey returns a redis key for process info.
func ProcessInfoKey(hostname string, pid int) string { func ProcessInfoKey(hostname string, pid int) string {
return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid) return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid)
} }
// WorkersKey returns a redis key for the workers given hostname and pid.
func WorkersKey(hostname string, pid int) string {
return fmt.Sprintf("%s%s:%d", workersPrefix, hostname, pid)
}
// TaskMessage is the internal representation of a task with additional metadata fields. // TaskMessage is the internal representation of a task with additional metadata fields.
// Serialized data of this type gets written to redis. // Serialized data of this type gets written to redis.
type TaskMessage struct { type TaskMessage struct {

View File

@ -74,7 +74,25 @@ func TestProcessInfoKey(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
got := ProcessInfoKey(tc.hostname, tc.pid) got := ProcessInfoKey(tc.hostname, tc.pid)
if got != tc.want { if got != tc.want {
t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) t.Errorf("ProcessInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want)
}
}
}
func TestWorkersKey(t *testing.T) {
tests := []struct {
hostname string
pid int
want string
}{
{"localhost", 9876, "asynq:workers:localhost:9876"},
{"127.0.0.1", 1234, "asynq:workers:127.0.0.1:1234"},
}
for _, tc := range tests {
got := WorkersKey(tc.hostname, tc.pid)
if got != tc.want {
t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want)
} }
} }
} }