mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
Support delete and archive actions on PendingTask
* Add `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` to `Inspector` * `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving PendingTask * Updated `asynq task` command with support for deleting/archiving pending tasks
This commit is contained in:
117
inspector.go
117
inspector.go
@@ -217,41 +217,62 @@ type ArchivedTask struct {
|
||||
score int64
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
// Format string used for task key.
|
||||
// Format is <prefix>:<uuid>:<score>.
|
||||
const taskKeyFormat = "%s:%v:%v"
|
||||
|
||||
// Prefix used for task key.
|
||||
const (
|
||||
keyPrefixPending = "p"
|
||||
keyPrefixScheduled = "s"
|
||||
keyPrefixRetry = "r"
|
||||
keyPrefixArchived = "a"
|
||||
|
||||
allKeyPrefixes = keyPrefixPending + keyPrefixScheduled + keyPrefixRetry + keyPrefixArchived
|
||||
)
|
||||
|
||||
// Key returns a key used to delete, and archive the pending task.
|
||||
func (t *PendingTask) Key() string {
|
||||
// Note: Pending tasks are stored in redis LIST, therefore no score.
|
||||
// Use zero for the score to use the same key format.
|
||||
return fmt.Sprintf(taskKeyFormat, keyPrefixPending, t.ID, 0)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and archive the scheduled task.
|
||||
func (t *ScheduledTask) Key() string {
|
||||
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
|
||||
return fmt.Sprintf(taskKeyFormat, keyPrefixScheduled, t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
// Key returns a key used to delete, run, and archive the retry task.
|
||||
func (t *RetryTask) Key() string {
|
||||
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
|
||||
return fmt.Sprintf(taskKeyFormat, keyPrefixRetry, t.ID, t.score)
|
||||
}
|
||||
|
||||
// Key returns a key used to delete, run, and archive the task.
|
||||
// Key returns a key used to delete and run the archived task.
|
||||
func (t *ArchivedTask) Key() string {
|
||||
return fmt.Sprintf("a:%v:%v", t.ID, t.score)
|
||||
return fmt.Sprintf(taskKeyFormat, keyPrefixArchived, t.ID, t.score)
|
||||
}
|
||||
|
||||
// parseTaskKey parses a key string and returns each part of key with proper
|
||||
// type if valid, otherwise it reports an error.
|
||||
func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) {
|
||||
func parseTaskKey(key string) (prefix string, id uuid.UUID, score int64, err error) {
|
||||
parts := strings.Split(key, ":")
|
||||
if len(parts) != 3 {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
|
||||
}
|
||||
id, err = uuid.Parse(parts[1])
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
|
||||
}
|
||||
score, err = strconv.ParseInt(parts[2], 10, 64)
|
||||
if err != nil {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
|
||||
}
|
||||
state = parts[0]
|
||||
if len(state) != 1 || !strings.Contains("sra", state) {
|
||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||
prefix = parts[0]
|
||||
if len(prefix) != 1 || !strings.Contains(allKeyPrefixes, prefix) {
|
||||
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
|
||||
}
|
||||
return id, score, state, nil
|
||||
return prefix, id, score, nil
|
||||
}
|
||||
|
||||
// ListOption specifies behavior of list operation.
|
||||
@@ -457,6 +478,16 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// DeleteAllPendingTasks deletes all pending tasks from the specified queue,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.DeleteAllPendingTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
|
||||
// and reports the number tasks deleted.
|
||||
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
|
||||
@@ -492,23 +523,25 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return err
|
||||
}
|
||||
id, score, state, err := parseTaskKey(key)
|
||||
prefix, id, score, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch state {
|
||||
case "s":
|
||||
switch prefix {
|
||||
case keyPrefixPending:
|
||||
return i.rdb.DeletePendingTask(qname, id)
|
||||
case keyPrefixScheduled:
|
||||
return i.rdb.DeleteScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
case keyPrefixRetry:
|
||||
return i.rdb.DeleteRetryTask(qname, id, score)
|
||||
case "a":
|
||||
case keyPrefixArchived:
|
||||
return i.rdb.DeleteArchivedTask(qname, id, score)
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue,
|
||||
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
|
||||
// and reports the number of tasks transitioned.
|
||||
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
@@ -518,7 +551,7 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// RunAllRetryTasks transition all retry tasks to pending state within the given queue,
|
||||
// RunAllRetryTasks transition all retry tasks to pending state from the given queue,
|
||||
// and reports the number of tasks transitioned.
|
||||
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
@@ -528,7 +561,7 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// RunAllArchivedTasks transition all archived tasks to pending state within the given queue,
|
||||
// RunAllArchivedTasks transition all archived tasks to pending state from the given queue,
|
||||
// and reports the number of tasks transitioned.
|
||||
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
@@ -543,23 +576,35 @@ func (i *Inspector) RunTaskByKey(qname, key string) error {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return err
|
||||
}
|
||||
id, score, state, err := parseTaskKey(key)
|
||||
prefix, id, score, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch state {
|
||||
case "s":
|
||||
switch prefix {
|
||||
case keyPrefixScheduled:
|
||||
return i.rdb.RunScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
case keyPrefixRetry:
|
||||
return i.rdb.RunRetryTask(qname, id, score)
|
||||
case "a":
|
||||
case keyPrefixArchived:
|
||||
return i.rdb.RunArchivedTask(qname, id, score)
|
||||
case keyPrefixPending:
|
||||
return fmt.Errorf("task is already pending for run")
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
}
|
||||
|
||||
// ArchiveAllScheduledTasks archives all scheduled tasks within the given queue,
|
||||
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
|
||||
// and reports the number of tasks archived.
|
||||
func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err := i.rdb.ArchiveAllPendingTasks(qname)
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
|
||||
// and reports the number of tasks archiveed.
|
||||
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
@@ -569,7 +614,7 @@ func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
|
||||
return int(n), err
|
||||
}
|
||||
|
||||
// ArchiveAllRetryTasks archives all retry tasks within the given queue,
|
||||
// ArchiveAllRetryTasks archives all retry tasks from the given queue,
|
||||
// and reports the number of tasks archiveed.
|
||||
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
@@ -584,17 +629,19 @@ func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
|
||||
if err := validateQueueName(qname); err != nil {
|
||||
return err
|
||||
}
|
||||
id, score, state, err := parseTaskKey(key)
|
||||
prefix, id, score, err := parseTaskKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch state {
|
||||
case "s":
|
||||
switch prefix {
|
||||
case keyPrefixPending:
|
||||
return i.rdb.ArchivePendingTask(qname, id)
|
||||
case keyPrefixScheduled:
|
||||
return i.rdb.ArchiveScheduledTask(qname, id, score)
|
||||
case "r":
|
||||
case keyPrefixRetry:
|
||||
return i.rdb.ArchiveRetryTask(qname, id, score)
|
||||
case "a":
|
||||
return fmt.Errorf("task already archived")
|
||||
case keyPrefixArchived:
|
||||
return fmt.Errorf("task is already archived")
|
||||
default:
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
|
Reference in New Issue
Block a user