diff --git a/inspeq/inspector.go b/inspeq/inspector.go index c7c8f46..de9cff2 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -608,7 +608,7 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { return int(n), err } -// DeleteTaskByKey deletes a task with the given id from the given queue. +// DeleteTask deletes a task with the given id from the given queue. func (i *Inspector) DeleteTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { return err @@ -647,28 +647,13 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { return int(n), err } -// RunTaskByKey transition a task to pending state given task key and queue name. -// TODO: Update this to run task by ID. -func (i *Inspector) RunTaskByKey(qname, key string) error { +// RunTask transition a task to pending state given task id and queue name. +func (i *Inspector) RunTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { return err } - prefix, id, _, err := parseTaskKey(key) - if err != nil { - return err - } - switch prefix { - case keyPrefixScheduled: - return i.rdb.RunScheduledTask(qname, id) - case keyPrefixRetry: - return i.rdb.RunRetryTask(qname, id) - case keyPrefixArchived: - return i.rdb.RunArchivedTask(qname, id) - case keyPrefixPending: - return fmt.Errorf("task is already pending for run") - default: - return fmt.Errorf("invalid key") - } + // TODO: Return ErrTaskNotFound and other meaningful error + return i.rdb.RunTask(qname, id) } // ArchiveAllPendingTasks archives all pending tasks from the given queue, @@ -701,28 +686,13 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { return int(n), err } -// ArchiveTaskByKey archives a task with the given key in the given queue. -// TODO: Update this to Archive task by ID. -func (i *Inspector) ArchiveTaskByKey(qname, key string) error { +// ArchiveTask archives a task with the given id in the given queue. +func (i *Inspector) ArchiveTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { return err } - prefix, id, _, err := parseTaskKey(key) - if err != nil { - return err - } - switch prefix { - case keyPrefixPending: - return i.rdb.ArchivePendingTask(qname, id) - case keyPrefixScheduled: - return i.rdb.ArchiveScheduledTask(qname, id) - case keyPrefixRetry: - return i.rdb.ArchiveRetryTask(qname, id) - case keyPrefixArchived: - return fmt.Errorf("task is already archived") - default: - return fmt.Errorf("invalid key") - } + // TODO: return ErrTaskNotFound or other meaningful error + return i.rdb.ArchiveTask(qname, id) } // CancelActiveTask sends a signal to cancel processing of the task with diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index a427b78..af9baf0 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -2111,7 +2111,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { +func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2128,7 +2128,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { scheduled map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantScheduled map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2142,7 +2142,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { "custom": {}, }, qname: "default", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2159,8 +2159,8 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantScheduled { @@ -2181,7 +2181,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { +func TestInspectorRunTaskRunsRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2198,7 +2198,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { retry map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantRetry map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2212,7 +2212,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2229,8 +2229,8 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantRetry { @@ -2250,7 +2250,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { +func TestInspectorRunTaskRunsArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2267,7 +2267,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { archived map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantArchived map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2283,7 +2283,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { "low": {}, }, qname: "critical", - key: createArchivedTask(z2).Key(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "critical": {}, @@ -2302,8 +2302,8 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { h.SeedAllArchivedQueues(t, r, tc.archived) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantArchived { @@ -2323,7 +2323,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2336,7 +2336,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { pending map[string][]*base.TaskMessage archived map[string][]base.Z qname string - key string + id string wantPending map[string][]*base.TaskMessage wantArchived map[string][]base.Z }{ @@ -2350,7 +2350,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "default", - key: createPendingTask(m1).Key(), + id: createPendingTask(m1).ID, wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2, m3}, @@ -2372,7 +2372,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createPendingTask(m2).Key(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2391,9 +2391,9 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { h.SeedAllPendingQueues(t, r, tc.pending) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", - tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", + tc.qname, tc.id, err) continue } for qname, want := range tc.wantPending { @@ -2414,7 +2414,7 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2431,7 +2431,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { scheduled map[string][]base.Z archived map[string][]base.Z qname string - key string + id string want string wantScheduled map[string][]base.Z wantArchived map[string][]base.Z @@ -2446,7 +2446,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2468,8 +2468,8 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantScheduled { @@ -2490,7 +2490,7 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { } } -func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { +func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2507,7 +2507,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { retry map[string][]base.Z archived map[string][]base.Z qname string - key string + id string wantRetry map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -2521,7 +2521,7 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2543,8 +2543,8 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.ArchiveTask(tc.qname, tc.id); err != nil { + t.Errorf("ArchiveTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantRetry { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index faca3f5..e406005 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -10,7 +10,6 @@ import ( "time" "github.com/go-redis/redis/v7" - "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/spf13/cast" ) @@ -447,48 +446,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro return zs, nil } -// RunArchivedTask finds an archived task that matches the given id and score from -// the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - -// RunRetryTask finds a retry task that matches the given id and score from -// the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - -// RunScheduledTask finds a scheduled task that matches the given id and score from -// from the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - // RunAllScheduledTasks enqueues all scheduled tasks from the given queue // and returns the number of tasks enqueued. func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) { @@ -519,16 +476,60 @@ redis.call("LPUSH", KEYS[2], ARGV[1]) return 1 `) -func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) { - res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result() +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:pending +// ARGV[1] -> task ID +// ARGV[2] -> redis key prefix (asynq:{}:) +var runTaskCmd = redis.NewScript(` +if redis.call("EXISTS", KEYS[1]) == 0 then + return 0 +end +local state = redis.call("HGET", KEYS[1], "state") +local n = 0 +if state == "ACTIVE" then + return redis.error_reply("task is already active") +elseif state == "PENDING" then + return redis.error_reply("task is already pending") +elseif state == "SCHEDULED" then + n = redis.call("ZREM", (ARGV[2] .. "scheduled"), ARGV[1]) +elseif state == "RETRY" then + n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1]) +elseif state == "ARCHIVED" then + n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1]) +else + return redis.error_reply("unknown task state: " .. tostring(state)) +end +if n == 0 then + return 0 +end +redis.call("LPUSH", KEYS[2], ARGV[1]) +return 1 +`) + +// RunTask finds a task that matches the given id from the given queue +// and stage it for processing (i.e. transition the task to pending state). +// If no match is found, it returns ErrTaskNotFound. +func (r *RDB) RunTask(qname, id string) error { + keys := []string{ + base.TaskKey(qname, id), + base.PendingKey(qname), + } + argv := []interface{}{ + id, + base.QueueKeyPrefix(qname), + } + res, err := runTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { - return 0, err + return err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return fmt.Errorf("command error: unexpected return value %v", res) } - return n, nil + if n == 0 { + return ErrTaskNotFound + } + return nil } var removeAndRunAllCmd = redis.NewScript(` @@ -712,7 +713,7 @@ elseif state == "RETRY" then n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1]) elseif state == "ARCHIVED" then n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1]) -elseif state == "ACTIVE" +elseif state == "ACTIVE" then return redis.error_reply("cannot delete active task") else return redis.error_reply("unknown task state: " .. tostring(state)) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 48e6669..59ade88 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1075,9 +1075,9 @@ func TestRunArchivedTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.RunArchivedTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1176,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue - got := r.RunRetryTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1277,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.RunScheduledTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue }