2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Update RDB.RunTask with more specific errors

This commit is contained in:
Ken Hibino
2021-05-05 16:31:07 -07:00
parent 430dbb298e
commit 961582cba6
2 changed files with 210 additions and 90 deletions

View File

@@ -440,19 +440,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
return zs, nil
}
// RunTask finds a task that matches the id from the given queue and stages it for processing.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunTask(qname string, id uuid.UUID) error {
n, err := r.runTask(qname, id.String())
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) {
@@ -471,19 +458,37 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
}
// runTaskCmd is a Lua script that updates the given task to pending state.
//
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// KEYS[3] -> all queues key
// --
// ARGV[1] -> task ID
// ARGV[2] -> queue key prefix; asynq:{<qname>}:
// ARGV[3] -> queue name
//
// Output:
// Numeric code indicating the status:
// Returns 1 if task is successfully updated.
// Returns 0 if task is not found.
// Returns -1 if queue doesn't exist.
// Returns -2 if task is in active state.
// Returns -3 if task is in pending state.
// Returns error reply if unexpected error occurs.
var runTaskCmd = redis.NewScript(`
if redis.call("SISMEMBER", KEYS[3], ARGV[3]) == 0 then
return -1
end
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state = redis.call("HGET", KEYS[1], "state")
if state == "active" then
return redis.error_reply("task is already running")
return -2
elseif state == "pending" then
return redis.error_reply("task is already pending to be run")
return -3
end
local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1])
if n == 0 then
@@ -494,24 +499,46 @@ redis.call("HSET", KEYS[1], "state", "pending")
return 1
`)
func (r *RDB) runTask(qname, id string) (int64, error) {
// RunTask finds a task that matches the id from the given queue and updates it to pending state.
// It returns nil if it successfully updated the task.
//
// If a queue with the given name doesn't exist, it returns QueueNotFoundError.
// If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError
// If a task is in active or pending state it returns non-nil error with Code FailedPrecondition.
func (r *RDB) RunTask(qname string, id uuid.UUID) error {
var op errors.Op = "rdb.RunTask"
keys := []string{
base.TaskKey(qname, id),
base.TaskKey(qname, id.String()),
base.PendingKey(qname),
base.AllQueues,
}
argv := []interface{}{
id,
id.String(),
base.QueueKeyPrefix(qname),
qname,
}
res, err := runTaskCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, err
return errors.E(op, errors.Unknown, err)
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("internal error: could not cast %v to int64", res)
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
}
switch n {
case 1:
return nil
case 0:
return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()})
case -1:
return errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
case -2:
return errors.E(op, errors.FailedPrecondition, "task is already running")
case -3:
return errors.E(op, errors.FailedPrecondition, "task is already in pending state")
default:
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script %d", n))
}
return n, nil
}
var removeAndRunAllCmd = redis.NewScript(`