From 456edb6b71862386019420a6c7a93646ac8101d5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 16 May 2021 07:32:47 -0700 Subject: [PATCH] Replace RunTaskByKey with RunTask in Inspector --- inspeq/inspector.go | 42 ++++++----- inspeq/inspector_test.go | 131 +++++++++++++++++++++++++++++---- internal/errors/errors.go | 16 +++- internal/errors/errors_test.go | 31 +++++++- 4 files changed, 186 insertions(+), 34 deletions(-) diff --git a/inspeq/inspector.go b/inspeq/inspector.go index 2aa2e7b..5deb054 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -140,8 +140,13 @@ var ( ErrQueueNotFound = errors.New("queue not found") // ErrQueueNotEmpty indicates that the specified queue is not empty. ErrQueueNotEmpty = errors.New("queue is not empty") + // ErrTaskNotFound indicates that the specified task cannot be found in the queue. + ErrTaskNotFound = errors.New("task not found") ) +type taskNotFoundError struct { +} + // DeleteQueue removes the specified queue. // // If force is set to true, DeleteQueue will remove the queue regardless of @@ -591,28 +596,31 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { return int(n), err } -// RunTaskByKey transition a task to pending state given task key and queue name. -// TODO: Update this to run task by ID. -func (i *Inspector) RunTaskByKey(qname, key string) error { +// RunTask updates the task to pending state given a queue name and task id. +// The task needs to be in scheduled, retry, or archived state, otherwise RunTask +// will return an error. +// +// If a queue with the given name doesn't exist, it returns ErrQueueNotFound. +// If a task with the given id doesn't exist in the queue, it returns ErrTaskNotFound. +// If the task is in pending or active state, it returns a non-nil error. +func (i *Inspector) RunTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { - return err + return fmt.Errorf("asynq: %v", err) } - prefix, id, _, err := parseTaskKey(key) + taskid, err := uuid.Parse(id) if err != nil { - return err + return fmt.Errorf("asynq: %s is not a valid task id", id) } - switch prefix { - case keyPrefixScheduled: - return i.rdb.RunTask(qname, id) - case keyPrefixRetry: - return i.rdb.RunTask(qname, id) - case keyPrefixArchived: - return i.rdb.RunTask(qname, id) - case keyPrefixPending: - return fmt.Errorf("task is already pending for run") - default: - return fmt.Errorf("invalid key") + err = i.rdb.RunTask(qname, taskid) + switch { + case errors.IsQueueNotFound(err): + return fmt.Errorf("asynq: %w", ErrQueueNotFound) + case errors.IsTaskNotFound(err): + return fmt.Errorf("asynq: %w", ErrTaskNotFound) + case err != nil: + return fmt.Errorf("asynq: %v", err) } + return nil } // ArchiveAllPendingTasks archives all pending tasks from the given queue, diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index e66d70a..351969a 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -17,6 +17,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" "github.com/hibiken/asynq" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" @@ -2113,7 +2114,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { +func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2130,7 +2131,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { scheduled map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantScheduled map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2144,7 +2145,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { "custom": {}, }, qname: "default", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2161,8 +2162,8 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantScheduled { @@ -2183,7 +2184,7 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { +func TestInspectorRunTaskRunsRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2200,7 +2201,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { retry map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantRetry map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2214,7 +2215,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2231,8 +2232,8 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTaskBy(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantRetry { @@ -2252,7 +2253,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { +func TestInspectorRunTaskRunsArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := h.NewTaskMessage("task1", nil) @@ -2269,7 +2270,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { archived map[string][]base.Z pending map[string][]*base.TaskMessage qname string - key string + id string wantArchived map[string][]base.Z wantPending map[string][]*base.TaskMessage }{ @@ -2285,7 +2286,7 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { "low": {}, }, qname: "critical", - key: createArchivedTask(z2).Key(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "critical": {}, @@ -2304,8 +2305,108 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { h.SeedAllArchivedQueues(t, r, tc.archived) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTask(tc.qname, tc.id); err != nil { + t.Errorf("RunTask(%q, %q) returned error: %v", tc.qname, tc.id, err) + continue + } + for qname, want := range tc.wantArchived { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", + qname, diff) + } + } + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", + qname, diff) + } + } + } +} + +func TestInspectorRunTaskError(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 := h.NewTaskMessageWithQueue("task3", nil, "low") + now := time.Now() + z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} + z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} + z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} + + inspector := New(getRedisConnOpt(t)) + + tests := []struct { + archived map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + id string + wantErr error + wantArchived map[string][]base.Z + wantPending map[string][]*base.TaskMessage + }{ + { + archived: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + qname: "nonexistent", + id: createArchivedTask(z2).ID, + wantErr: ErrQueueNotFound, + wantArchived: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + }, + { + archived: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + qname: "default", + id: uuid.NewString(), + wantErr: ErrTaskNotFound, + wantArchived: map[string][]base.Z{ + "default": {z1}, + "critical": {z2}, + "low": {z3}, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + "low": {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) + h.SeedAllPendingQueues(t, r, tc.pending) + + if err := inspector.RunTask(tc.qname, tc.id); !errors.Is(err, tc.wantErr) { + t.Errorf("RunTask(%q, %q) = %v, want %v", tc.qname, tc.id, err, tc.wantErr) continue } for qname, want := range tc.wantArchived { diff --git a/internal/errors/errors.go b/internal/errors/errors.go index ebcecfc..5f415ac 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -26,7 +26,7 @@ type Error struct { Err error } -func (e *Error) Error() string { +func (e *Error) DebugString() string { var b strings.Builder if e.Op != "" { b.WriteString(string(e.Op)) @@ -46,6 +46,20 @@ func (e *Error) Error() string { return b.String() } +func (e *Error) Error() string { + var b strings.Builder + if e.Code != Unspecified { + b.WriteString(e.Code.String()) + } + if e.Err != nil { + if b.Len() > 0 { + b.WriteString(": ") + } + b.WriteString(e.Err.Error()) + } + return b.String() +} + func (e *Error) Unwrap() error { return e.Err } diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go index e541017..ae44170 100644 --- a/internal/errors/errors_test.go +++ b/internal/errors/errors_test.go @@ -6,7 +6,9 @@ package errors import "testing" -func TestErrorString(t *testing.T) { +func TestErrorDebugString(t *testing.T) { + // DebugString should include Op since its meant to be used by + // maintainers/contributors of the asynq package. tests := []struct { desc string err error @@ -24,6 +26,33 @@ func TestErrorString(t *testing.T) { }, } + for _, tc := range tests { + if got := tc.err.(*Error).DebugString(); got != tc.want { + t.Errorf("%s: got=%q, want=%q", tc.desc, got, tc.want) + } + } +} + +func TestErrorString(t *testing.T) { + // String method should omit Op since op is an internal detail + // and we don't want to provide it to users of the package. + tests := []struct { + desc string + err error + want string + }{ + { + desc: "With Op, Code, and string", + err: E(Op("rdb.DeleteTask"), NotFound, "cannot find task with id=123"), + want: "NOT_FOUND: cannot find task with id=123", + }, + { + desc: "With Op, Code and error", + err: E(Op("rdb.DeleteTask"), NotFound, &TaskNotFoundError{Queue: "default", ID: "123"}), + want: `NOT_FOUND: cannot find task with id=123 in queue "default"`, + }, + } + for _, tc := range tests { if got := tc.err.Error(); got != tc.want { t.Errorf("%s: got=%q, want=%q", tc.desc, got, tc.want)