2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Run task by qname and id

This commit is contained in:
Ken Hibino 2021-03-28 06:58:54 -07:00
parent 4bb67c582d
commit 56af9f6686
4 changed files with 97 additions and 126 deletions

View File

@ -608,7 +608,7 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
return int(n), err
}
// DeleteTaskByKey deletes a task with the given id from the given queue.
// DeleteTask deletes a task with the given id from the given queue.
func (i *Inspector) DeleteTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
return err
@ -647,28 +647,13 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
return int(n), err
}
// RunTaskByKey transition a task to pending state given task key and queue name.
// TODO: Update this to run task by ID.
func (i *Inspector) RunTaskByKey(qname, key string) error {
// RunTask transition a task to pending state given task id and queue name.
func (i *Inspector) RunTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
prefix, id, _, err := parseTaskKey(key)
if err != nil {
return err
}
switch prefix {
case keyPrefixScheduled:
return i.rdb.RunScheduledTask(qname, id)
case keyPrefixRetry:
return i.rdb.RunRetryTask(qname, id)
case keyPrefixArchived:
return i.rdb.RunArchivedTask(qname, id)
case keyPrefixPending:
return fmt.Errorf("task is already pending for run")
default:
return fmt.Errorf("invalid key")
}
// TODO: Return ErrTaskNotFound and other meaningful error
return i.rdb.RunTask(qname, id)
}
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
@ -701,28 +686,13 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
return int(n), err
}
// ArchiveTaskByKey archives a task with the given key in the given queue.
// TODO: Update this to Archive task by ID.
func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
// ArchiveTask archives a task with the given id in the given queue.
func (i *Inspector) ArchiveTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
return err
}
prefix, id, _, err := parseTaskKey(key)
if err != nil {
return err
}
switch prefix {
case keyPrefixPending:
return i.rdb.ArchivePendingTask(qname, id)
case keyPrefixScheduled:
return i.rdb.ArchiveScheduledTask(qname, id)
case keyPrefixRetry:
return i.rdb.ArchiveRetryTask(qname, id)
case keyPrefixArchived:
return fmt.Errorf("task is already archived")
default:
return fmt.Errorf("invalid key")
}
// TODO: return ErrTaskNotFound or other meaningful error
return i.rdb.ArchiveTask(qname, id)
}
// CancelActiveTask sends a signal to cancel processing of the task with

View File

@ -2111,7 +2111,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) {
}
}
func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2128,7 +2128,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
scheduled map[string][]base.Z
pending map[string][]*base.TaskMessage
qname string
key string
id string
wantScheduled map[string][]base.Z
wantPending map[string][]*base.TaskMessage
}{
@ -2142,7 +2142,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
"custom": {},
},
qname: "default",
key: createScheduledTask(z2).Key(),
id: createScheduledTask(z2).ID,
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2159,8 +2159,8 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
h.SeedAllScheduledQueues(t, r, tc.scheduled)
h.SeedAllPendingQueues(t, r, tc.pending)
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantScheduled {
@ -2181,7 +2181,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) {
}
}
func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2198,7 +2198,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
retry map[string][]base.Z
pending map[string][]*base.TaskMessage
qname string
key string
id string
wantRetry map[string][]base.Z
wantPending map[string][]*base.TaskMessage
}{
@ -2212,7 +2212,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
"custom": {},
},
qname: "custom",
key: createRetryTask(z2).Key(),
id: createRetryTask(z2).ID,
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2229,8 +2229,8 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllPendingQueues(t, r, tc.pending)
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantRetry {
@ -2250,7 +2250,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) {
}
}
func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
func TestInspectorRunTaskRunsArchivedTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2267,7 +2267,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
archived map[string][]base.Z
pending map[string][]*base.TaskMessage
qname string
key string
id string
wantArchived map[string][]base.Z
wantPending map[string][]*base.TaskMessage
}{
@ -2283,7 +2283,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
"low": {},
},
qname: "critical",
key: createArchivedTask(z2).Key(),
id: createArchivedTask(z2).ID,
wantArchived: map[string][]base.Z{
"default": {z1},
"critical": {},
@ -2302,8 +2302,8 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
h.SeedAllArchivedQueues(t, r, tc.archived)
h.SeedAllPendingQueues(t, r, tc.pending)
if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
if err := inspector.RunTask(tc.qname, tc.id); err != nil {
t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantArchived {
@ -2323,7 +2323,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) {
}
}
func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2336,7 +2336,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
key string
id string
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
@ -2350,7 +2350,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
"custom": {},
},
qname: "default",
key: createPendingTask(m1).Key(),
id: createPendingTask(m1).ID,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m2, m3},
@ -2372,7 +2372,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
"custom": {},
},
qname: "custom",
key: createPendingTask(m2).Key(),
id: createPendingTask(m2).ID,
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {m3},
@ -2391,9 +2391,9 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
h.SeedAllPendingQueues(t, r, tc.pending)
h.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v",
tc.qname, tc.key, err)
if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil {
t.Errorf("ArchiveTask(%q, %q) returned error: %v",
tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantPending {
@ -2414,7 +2414,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) {
}
}
func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2431,7 +2431,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
scheduled map[string][]base.Z
archived map[string][]base.Z
qname string
key string
id string
want string
wantScheduled map[string][]base.Z
wantArchived map[string][]base.Z
@ -2446,7 +2446,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
"custom": {},
},
qname: "custom",
key: createScheduledTask(z2).Key(),
id: createScheduledTask(z2).ID,
wantScheduled: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2468,8 +2468,8 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
h.SeedAllScheduledQueues(t, r, tc.scheduled)
h.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil {
t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantScheduled {
@ -2490,7 +2490,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) {
}
}
func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) {
func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@ -2507,7 +2507,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) {
retry map[string][]base.Z
archived map[string][]base.Z
qname string
key string
id string
wantRetry map[string][]base.Z
wantArchived map[string][]base.Z
}{
@ -2521,7 +2521,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) {
"custom": {},
},
qname: "custom",
key: createRetryTask(z2).Key(),
id: createRetryTask(z2).ID,
wantRetry: map[string][]base.Z{
"default": {z1},
"custom": {z3},
@ -2543,8 +2543,8 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) {
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllArchivedQueues(t, r, tc.archived)
if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil {
t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil {
t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err)
continue
}
for qname, want := range tc.wantRetry {

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/spf13/cast"
)
@ -447,48 +446,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
return zs, nil
}
// RunArchivedTask finds an archived task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// RunRetryTask finds a retry task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// RunScheduledTask finds a scheduled task that matches the given id and score from
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) {
@ -519,16 +476,60 @@ redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result()
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
// ARGV[2] -> redis key prefix (asynq:{<qname>}:)
var runTaskCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then
return 0
end
local state = redis.call("HGET", KEYS[1], "state")
local n = 0
if state == "ACTIVE" then
return redis.error_reply("task is already active")
elseif state == "PENDING" then
return redis.error_reply("task is already pending")
elseif state == "SCHEDULED" then
n = redis.call("ZREM", (ARGV[2] .. "scheduled"), ARGV[1])
elseif state == "RETRY" then
n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1])
elseif state == "ARCHIVED" then
n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1])
else
return redis.error_reply("unknown task state: " .. tostring(state))
end
if n == 0 then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
// RunTask finds a task that matches the given id from the given queue
// and stage it for processing (i.e. transition the task to pending state).
// If no match is found, it returns ErrTaskNotFound.
func (r *RDB) RunTask(qname, id string) error {
keys := []string{
base.TaskKey(qname, id),
base.PendingKey(qname),
}
argv := []interface{}{
id,
base.QueueKeyPrefix(qname),
}
res, err := runTaskCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, err
return err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
if n == 0 {
return ErrTaskNotFound
}
return nil
}
var removeAndRunAllCmd = redis.NewScript(`
@ -712,7 +713,7 @@ elseif state == "RETRY" then
n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1])
elseif state == "ARCHIVED" then
n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1])
elseif state == "ACTIVE"
elseif state == "ACTIVE" then
return redis.error_reply("cannot delete active task")
else
return redis.error_reply("unknown task state: " .. tostring(state))

View File

@ -1075,9 +1075,9 @@ func TestRunArchivedTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.RunArchivedTask(tc.qname, tc.id)
got := r.RunTask(tc.qname, tc.id.String())
if got != tc.want {
t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@ -1176,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
got := r.RunRetryTask(tc.qname, tc.id)
got := r.RunTask(tc.qname, tc.id.String())
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@ -1277,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.RunScheduledTask(tc.qname, tc.id)
got := r.RunTask(tc.qname, tc.id.String())
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}