2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Use constants for task keys for better readability

This commit is contained in:
Ken Hibino 2021-01-19 07:13:59 -08:00
parent 35faab1a4c
commit f76b8ae9c0

View File

@ -217,48 +217,62 @@ type ArchivedTask struct {
score int64 score int64
} }
// Format string used for task key.
// Format is <prefix>:<uuid>:<score>.
const taskKeyFormat = "%s:%v:%v"
// Prefix used for task key.
const (
keyPrefixPending = "p"
keyPrefixScheduled = "s"
keyPrefixRetry = "r"
keyPrefixArchived = "a"
allKeyPrefixes = keyPrefixPending + keyPrefixScheduled + keyPrefixRetry + keyPrefixArchived
)
// Key returns a key used to delete, and archive the pending task. // Key returns a key used to delete, and archive the pending task.
func (t *PendingTask) Key() string { func (t *PendingTask) Key() string {
// Note: Pending tasks are stored in redis LIST, therefore no score. // Note: Pending tasks are stored in redis LIST, therefore no score.
// Use zero for the score to preserve the same key format. // Use zero for the score to use the same key format.
return fmt.Sprintf("p:%v:%v", t.ID, 0) return fmt.Sprintf(taskKeyFormat, keyPrefixPending, t.ID, 0)
} }
// Key returns a key used to delete, run, and archive the scheduled task. // Key returns a key used to delete, run, and archive the scheduled task.
func (t *ScheduledTask) Key() string { func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score) return fmt.Sprintf(taskKeyFormat, keyPrefixScheduled, t.ID, t.score)
} }
// Key returns a key used to delete, run, and archive the retry task. // Key returns a key used to delete, run, and archive the retry task.
func (t *RetryTask) Key() string { func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score) return fmt.Sprintf(taskKeyFormat, keyPrefixRetry, t.ID, t.score)
} }
// Key returns a key used to delete and run the archived task. // Key returns a key used to delete and run the archived task.
func (t *ArchivedTask) Key() string { func (t *ArchivedTask) Key() string {
return fmt.Sprintf("a:%v:%v", t.ID, t.score) return fmt.Sprintf(taskKeyFormat, keyPrefixArchived, t.ID, t.score)
} }
// parseTaskKey parses a key string and returns each part of key with proper // parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error. // type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) { func parseTaskKey(key string) (prefix string, id uuid.UUID, score int64, err error) {
parts := strings.Split(key, ":") parts := strings.Split(key, ":")
if len(parts) != 3 { if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return "", uuid.Nil, 0, fmt.Errorf("invalid id")
} }
id, err = uuid.Parse(parts[1]) id, err = uuid.Parse(parts[1])
if err != nil { if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return "", uuid.Nil, 0, fmt.Errorf("invalid id")
} }
score, err = strconv.ParseInt(parts[2], 10, 64) score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil { if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return "", uuid.Nil, 0, fmt.Errorf("invalid id")
} }
state = parts[0] prefix = parts[0]
if len(state) != 1 || !strings.Contains("psra", state) { if len(prefix) != 1 || !strings.Contains(allKeyPrefixes, prefix) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return "", uuid.Nil, 0, fmt.Errorf("invalid id")
} }
return id, score, state, nil return prefix, id, score, nil
} }
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
@ -509,18 +523,18 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := validateQueueName(qname); err != nil {
return err return err
} }
id, score, state, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch state { switch prefix {
case "p": case keyPrefixPending:
return i.rdb.DeletePendingTask(qname, id) return i.rdb.DeletePendingTask(qname, id)
case "s": case keyPrefixScheduled:
return i.rdb.DeleteScheduledTask(qname, id, score) return i.rdb.DeleteScheduledTask(qname, id, score)
case "r": case keyPrefixRetry:
return i.rdb.DeleteRetryTask(qname, id, score) return i.rdb.DeleteRetryTask(qname, id, score)
case "a": case keyPrefixArchived:
return i.rdb.DeleteArchivedTask(qname, id, score) return i.rdb.DeleteArchivedTask(qname, id, score)
default: default:
return fmt.Errorf("invalid key") return fmt.Errorf("invalid key")
@ -562,17 +576,19 @@ func (i *Inspector) RunTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := validateQueueName(qname); err != nil {
return err return err
} }
id, score, state, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch state { switch prefix {
case "s": case keyPrefixScheduled:
return i.rdb.RunScheduledTask(qname, id, score) return i.rdb.RunScheduledTask(qname, id, score)
case "r": case keyPrefixRetry:
return i.rdb.RunRetryTask(qname, id, score) return i.rdb.RunRetryTask(qname, id, score)
case "a": case keyPrefixArchived:
return i.rdb.RunArchivedTask(qname, id, score) return i.rdb.RunArchivedTask(qname, id, score)
case keyPrefixPending:
return fmt.Errorf("task is already pending for run")
default: default:
return fmt.Errorf("invalid key") return fmt.Errorf("invalid key")
} }
@ -613,19 +629,19 @@ func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := validateQueueName(qname); err != nil {
return err return err
} }
id, score, state, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch state { switch prefix {
case "p": case keyPrefixPending:
return i.rdb.ArchivePendingTask(qname, id) return i.rdb.ArchivePendingTask(qname, id)
case "s": case keyPrefixScheduled:
return i.rdb.ArchiveScheduledTask(qname, id, score) return i.rdb.ArchiveScheduledTask(qname, id, score)
case "r": case keyPrefixRetry:
return i.rdb.ArchiveRetryTask(qname, id, score) return i.rdb.ArchiveRetryTask(qname, id, score)
case "a": case keyPrefixArchived:
return fmt.Errorf("task already archived") return fmt.Errorf("task is already archived")
default: default:
return fmt.Errorf("invalid key") return fmt.Errorf("invalid key")
} }