mirror of
https://github.com/hibiken/asynq.git
synced 2025-08-19 23:19:10 +08:00
Update RDB.DeleteTask with task state
This commit is contained in:
@@ -743,77 +743,79 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error {
|
||||
return r.deleteTask(base.ArchivedKey(qname), qname, id.String())
|
||||
}
|
||||
|
||||
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error {
|
||||
return r.deleteTask(base.RetryKey(qname), qname, id.String())
|
||||
}
|
||||
|
||||
// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue.
|
||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error {
|
||||
return r.deleteTask(base.ScheduledKey(qname), qname, id.String())
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:pending
|
||||
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
|
||||
// ARGV[1] -> task ID
|
||||
var deletePendingTaskCmd = redis.NewScript(`
|
||||
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||
return 0
|
||||
end
|
||||
return redis.call("DEL", KEYS[2])
|
||||
`)
|
||||
|
||||
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
|
||||
// If there's no match, it returns ErrTaskNotFound.
|
||||
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
|
||||
keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())}
|
||||
res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return fmt.Errorf("command error: unexpected return value %v", res)
|
||||
}
|
||||
if n == 0 {
|
||||
return ErrTaskNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{<qname>}:retry)
|
||||
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
|
||||
// Input:
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[2] -> all queues key
|
||||
// ARGV[1] -> task ID
|
||||
// ARGV[2] -> queue key prefix
|
||||
// ARGV[3] -> queue name
|
||||
//
|
||||
// Output:
|
||||
// Numeric code indicating the status:
|
||||
// Returns 1 if task is successfully deleted.
|
||||
// Returns 0 if task is not found.
|
||||
// Returns -1 if queue doesn't exist.
|
||||
// Returns -2 if task is in active state.
|
||||
var deleteTaskCmd = redis.NewScript(`
|
||||
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
|
||||
if redis.call("SISMEMBER", KEYS[2], ARGV[3]) == 0 then
|
||||
return -1
|
||||
end
|
||||
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||
return 0
|
||||
end
|
||||
return redis.call("DEL", KEYS[2])
|
||||
local state = redis.call("HGET", KEYS[1], "state")
|
||||
if state == "active" then
|
||||
return -2
|
||||
end
|
||||
if state == "pending" then
|
||||
if redis.call("LREM", ARGV[2] .. state, 0, ARGV[1]) == 0 then
|
||||
return redis.error_reply("task is not found in list: " .. tostring(state))
|
||||
end
|
||||
else
|
||||
if redis.call("ZREM", ARGV[2] .. state, ARGV[1]) == 0 then
|
||||
return redis.error_reply("task is not found in zset: " .. tostring(state))
|
||||
end
|
||||
end
|
||||
return redis.call("DEL", KEYS[1])
|
||||
`)
|
||||
|
||||
func (r *RDB) deleteTask(key, qname, id string) error {
|
||||
keys := []string{key, base.TaskKey(qname, id)}
|
||||
argv := []interface{}{id}
|
||||
// DeleteTask finds a task that matches the id from the given queue and deletes it.
|
||||
// It returns nil if it successfully archived the task.
|
||||
//
|
||||
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
|
||||
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
|
||||
// If a task is in active state it returns non-nil error with Code FailedPrecondition.
|
||||
func (r *RDB) DeleteTask(qname string, id uuid.UUID) error {
|
||||
var op errors.Op = "rdb.DeleteTask"
|
||||
keys := []string{
|
||||
base.TaskKey(qname, id.String()),
|
||||
base.AllQueues,
|
||||
}
|
||||
argv := []interface{}{
|
||||
id.String(),
|
||||
base.QueueKeyPrefix(qname),
|
||||
qname,
|
||||
}
|
||||
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return fmt.Errorf("command error: unexpected return value %v", res)
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: deleteTaskCmd script returned unexported value %v", res))
|
||||
}
|
||||
if n == 0 {
|
||||
return ErrTaskNotFound
|
||||
switch n {
|
||||
case 1:
|
||||
return nil
|
||||
case 0:
|
||||
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()})
|
||||
case -1:
|
||||
return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
|
||||
case -2:
|
||||
return errors.E(op, errors.FailedPrecondition, "cannot delete task in active state. use CancelTask instead.")
|
||||
default:
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from deleteTaskCmd script: %d", n))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> queue to delete
|
||||
|
Reference in New Issue
Block a user