diff --git a/inspector.go b/inspector.go index a07137d..e082093 100644 --- a/inspector.go +++ b/inspector.go @@ -166,6 +166,31 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { return err } +// GetTaskInfo retrieves task information given a task id and queue name. +// +// Returns ErrQueueNotFound if a queue with the given name doesn't exist. +// Returns ErrTaskNotFound if a task with the given id doesn't exist in the queue. +func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { + taskid, err := uuid.Parse(id) + if err != nil { + return nil, fmt.Errorf("asynq: %s is not a valid task id", id) + } + info, err := i.rdb.GetTaskInfo(qname, taskid) + switch { + case errors.IsQueueNotFound(err): + return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) + case errors.IsTaskNotFound(err): + return nil, fmt.Errorf("asynq: %w", ErrTaskNotFound) + case err != nil: + return nil, fmt.Errorf("asynq: %v", err) + } + return &TaskInfo{ + msg: info.Message, + state: info.State, + nextProcessAt: info.NextProcessAt, + }, nil +} + // ListOption specifies behavior of list operation. type ListOption interface{} diff --git a/inspector_test.go b/inspector_test.go index 6104541..4bf62d9 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -430,6 +430,205 @@ func createPendingTask(msg *base.TaskMessage) *TaskInfo { } } +func TestInspectorGetTaskInfo(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessageWithQueue("task1", nil, "default") + m2 := h.NewTaskMessageWithQueue("task2", nil, "default") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m5 := h.NewTaskMessageWithQueue("task5", nil, "custom") + + now := time.Now() + fiveMinsFromNow := now.Add(5 * time.Minute) + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursAgo := now.Add(-2 * time.Hour) + + fixtures := struct { + active map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + }{ + active: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m5}, + }, + scheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: fiveMinsFromNow.Unix()}}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: oneHourFromNow.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: twoHoursAgo.Unix()}}, + }, + } + + h.SeedAllActiveQueues(t, r, fixtures.active) + h.SeedAllPendingQueues(t, r, fixtures.pending) + h.SeedAllScheduledQueues(t, r, fixtures.scheduled) + h.SeedAllRetryQueues(t, r, fixtures.retry) + h.SeedAllArchivedQueues(t, r, fixtures.archived) + + tests := []struct { + qname string + id string + want *TaskInfo + }{ + { + qname: "default", + id: m1.ID.String(), + want: &TaskInfo{ + msg: m1, + state: base.TaskStateActive, + nextProcessAt: time.Time{}, // zero value for n/a + }, + }, + { + qname: "default", + id: m2.ID.String(), + want: &TaskInfo{ + msg: m2, + state: base.TaskStateScheduled, + nextProcessAt: fiveMinsFromNow, + }, + }, + { + qname: "custom", + id: m3.ID.String(), + want: &TaskInfo{ + msg: m3, + state: base.TaskStateRetry, + nextProcessAt: oneHourFromNow, + }, + }, + { + qname: "custom", + id: m4.ID.String(), + want: &TaskInfo{ + msg: m4, + state: base.TaskStateArchived, + nextProcessAt: time.Time{}, // zero value for n/a + }, + }, + { + qname: "custom", + id: m5.ID.String(), + want: &TaskInfo{ + msg: m5, + state: base.TaskStatePending, + nextProcessAt: now, + }, + }, + } + + inspector := NewInspector(getRedisConnOpt(t)) + for _, tc := range tests { + got, err := inspector.GetTaskInfo(tc.qname, tc.id) + if err != nil { + t.Errorf("GetTaskInfo(%q, %q) returned error: %v", tc.qname, tc.id, err) + continue + } + cmpOpts := []cmp.Option{ + cmp.AllowUnexported(TaskInfo{}), + cmpopts.EquateApproxTime(2 * time.Second), + } + if diff := cmp.Diff(tc.want, got, cmpOpts...); diff != "" { + t.Errorf("GetTaskInfo(%q, %q) = %v, want %v; (-want, +got)\n%s", tc.qname, tc.id, got, tc.want, diff) + } + } +} + +func TestInspectorGetTaskInfoError(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessageWithQueue("task1", nil, "default") + m2 := h.NewTaskMessageWithQueue("task2", nil, "default") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m5 := h.NewTaskMessageWithQueue("task5", nil, "custom") + + now := time.Now() + fiveMinsFromNow := now.Add(5 * time.Minute) + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursAgo := now.Add(-2 * time.Hour) + + fixtures := struct { + active map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + }{ + active: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m5}, + }, + scheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: fiveMinsFromNow.Unix()}}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: oneHourFromNow.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: twoHoursAgo.Unix()}}, + }, + } + + h.SeedAllActiveQueues(t, r, fixtures.active) + h.SeedAllPendingQueues(t, r, fixtures.pending) + h.SeedAllScheduledQueues(t, r, fixtures.scheduled) + h.SeedAllRetryQueues(t, r, fixtures.retry) + h.SeedAllArchivedQueues(t, r, fixtures.archived) + + tests := []struct { + qname string + id string + wantErr error + }{ + { + qname: "nonexistent", + id: m1.ID.String(), + wantErr: ErrQueueNotFound, + }, + { + qname: "default", + id: uuid.NewString(), + wantErr: ErrTaskNotFound, + }, + } + + inspector := NewInspector(getRedisConnOpt(t)) + + for _, tc := range tests { + info, err := inspector.GetTaskInfo(tc.qname, tc.id) + if info != nil { + t.Errorf("GetTaskInfo(%q, %q) returned info: %v", tc.qname, tc.id, info) + } + if !errors.Is(err, tc.wantErr) { + t.Errorf("GetTaskInfo(%q, %q) returned unexpected error: %v, want %v", tc.qname, tc.id, err, tc.wantErr) + } + } +} + func TestInspectorListPendingTasks(t *testing.T) { r := setup(t) defer r.Close()