diff --git a/asynq.go b/asynq.go index c53a61c..854270a 100644 --- a/asynq.go +++ b/asynq.go @@ -90,6 +90,15 @@ type TaskInfo struct { CompletedAt time.Time } +// If t is non-zero, returns time converted from t as unix time in seconds. +// If t is zero, returns zero value of time.Time. +func fromUnixTimeOrZero(t int64) time.Time { + if t == 0 { + return time.Time{} + } + return time.Unix(t, 0) +} + func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo { info := TaskInfo{ ID: msg.ID, @@ -100,25 +109,11 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time Retried: msg.Retried, LastErr: msg.ErrorMsg, Timeout: time.Duration(msg.Timeout) * time.Second, + Deadline: fromUnixTimeOrZero(msg.Deadline), ResultTTL: time.Duration(msg.ResultTTL) * time.Second, NextProcessAt: nextProcessAt, - } - if msg.LastFailedAt == 0 { - info.LastFailedAt = time.Time{} - } else { - info.LastFailedAt = time.Unix(msg.LastFailedAt, 0) - } - - if msg.Deadline == 0 { - info.Deadline = time.Time{} - } else { - info.Deadline = time.Unix(msg.Deadline, 0) - } - - if msg.CompletedAt == 0 { - info.CompletedAt = time.Time{} - } else { - info.CompletedAt = time.Unix(msg.CompletedAt, 0) + LastFailedAt: fromUnixTimeOrZero(msg.LastFailedAt), + CompletedAt: fromUnixTimeOrZero(msg.CompletedAt), } switch state { @@ -132,6 +127,8 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time info.State = TaskStateRetry case base.TaskStateArchived: info.State = TaskStateArchived + case base.TaskStateCompleted: + info.State = TaskStateCompleted default: panic(fmt.Sprintf("internal error: unknown state: %d", state)) } @@ -156,6 +153,10 @@ const ( // Indicates that the task is archived and stored for inspection purposes. TaskStateArchived + + // Indicates that the task is processed successfully and stored until the retention perioid specified + // by result_ttl expires. + TaskStateCompleted ) func (s TaskState) String() string { @@ -170,6 +171,8 @@ func (s TaskState) String() string { return "retry" case TaskStateArchived: return "archived" + case TaskStateCompleted: + return "completed" } panic("asynq: unknown task state") } diff --git a/inspector.go b/inspector.go index e3da671..8c9a263 100644 --- a/inspector.go +++ b/inspector.go @@ -381,6 +381,34 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task return tasks, nil } +// ListCompletedTasks retrieves completed tasks from the specified queue. +// Tasks are sorted by expiration time (i.e. CompletedAt + ResultTTL) in descending order. +// +// By default, it retrieves the first 30 tasks. +func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(qname); err != nil { + return nil, fmt.Errorf("asynq: %v", err) + } + opt := composeListOptions(opts...) + pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} + zs, err := i.rdb.ListCompleted(qname, pgn) + switch { + case errors.IsQueueNotFound(err): + return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) + case err != nil: + return nil, fmt.Errorf("asynq: %v", err) + } + var tasks []*TaskInfo + for _, z := range zs { + tasks = append(tasks, newTaskInfo( + z.Message, + base.TaskStateCompleted, + time.Time{}, + )) + } + return tasks, nil +} + // DeleteAllPendingTasks deletes all pending tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { diff --git a/inspector_test.go b/inspector_test.go index 88c13c7..a054ac8 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -952,6 +952,82 @@ func TestInspectorListArchivedTasks(t *testing.T) { } } +func newCompletedTaskMessage(typename, qname string, resultTTL time.Duration, completedAt time.Time) *base.TaskMessage { + msg := h.NewTaskMessageWithQueue(typename, nil, qname) + msg.ResultTTL = int64(resultTTL.Seconds()) + msg.CompletedAt = completedAt.Unix() + return msg +} + +func createCompletedTask(z base.Z) *TaskInfo { + return newTaskInfo( + z.Message, + base.TaskStateCompleted, + time.Time{}, // zero value for n/a + ) +} + +func TestInspectorListCompletedTasks(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + m1 := newCompletedTaskMessage("task1", "default", 1*time.Hour, now.Add(-3*time.Minute)) // Expires in 57 mins + m2 := newCompletedTaskMessage("task2", "default", 30*time.Minute, now.Add(-10*time.Minute)) // Expires in 20 mins + m3 := newCompletedTaskMessage("task3", "default", 2*time.Hour, now.Add(-30*time.Minute)) // Expires in 90 mins + m4 := newCompletedTaskMessage("task4", "custom", 15*time.Minute, now.Add(-2*time.Minute)) // Expires in 13 mins + z1 := base.Z{Message: m1, Score: m1.CompletedAt + m1.ResultTTL} + z2 := base.Z{Message: m2, Score: m2.CompletedAt + m2.ResultTTL} + z3 := base.Z{Message: m3, Score: m3.CompletedAt + m3.ResultTTL} + z4 := base.Z{Message: m4, Score: m4.CompletedAt + m4.ResultTTL} + + inspector := NewInspector(getRedisConnOpt(t)) + + tests := []struct { + desc string + completed map[string][]base.Z + qname string + want []*TaskInfo + }{ + { + desc: "with a few completed tasks", + completed: map[string][]base.Z{ + "default": {z1, z2, z3}, + "custom": {z4}, + }, + qname: "default", + // Should be sorted by expiration time (CompletedAt + ResultTTL). + want: []*TaskInfo{ + createCompletedTask(z2), + createCompletedTask(z1), + createCompletedTask(z3), + }, + }, + { + desc: "with empty completed queue", + completed: map[string][]base.Z{ + "default": {}, + }, + qname: "default", + want: []*TaskInfo(nil), + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + h.SeedAllCompletedQueues(t, r, tc.completed) + + got, err := inspector.ListCompletedTasks(tc.qname) + if err != nil { + t.Errorf("%s; ListCompletedTasks(%q) returned error: %v", tc.desc, tc.qname, err) + continue + } + if diff := cmp.Diff(tc.want, got, cmp.AllowUnexported(TaskInfo{})); diff != "" { + t.Errorf("%s; ListCompletedTasks(%q) = %v, want %v; (-want,+got)\n%s", + tc.desc, tc.qname, got, tc.want, diff) + } + } +} + func TestInspectorListPagination(t *testing.T) { // Create 100 tasks. var msgs []*base.TaskMessage @@ -1050,6 +1126,9 @@ func TestInspectorListTasksQueueNotFoundError(t *testing.T) { if _, err := inspector.ListArchivedTasks(tc.qname); !errors.Is(err, tc.wantErr) { t.Errorf("ListArchivedTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr) } + if _, err := inspector.ListCompletedTasks(tc.qname); !errors.Is(err, tc.wantErr) { + t.Errorf("ListCompletedTasks(%q) returned error %v, want %v", tc.qname, err, tc.wantErr) + } } } diff --git a/internal/context/context_test.go b/internal/context/context_test.go index 7b7d569..b47ad40 100644 --- a/internal/context/context_test.go +++ b/internal/context/context_test.go @@ -108,7 +108,7 @@ func TestGetResultWriterFromContext(t *testing.T) { r := setup(t) defer r.Close() rdbClient := rdb.NewRDB(r) - const deadline = time.Now().Add(30 * time.Minute) + deadline := time.Now().Add(30 * time.Minute) tests := []struct { msg *base.TaskMessage @@ -120,7 +120,7 @@ func TestGetResultWriterFromContext(t *testing.T) { ctx, cancel := createContext(tc.msg, deadline, rdbClient) defer cancel() - w, ok := GetResultWriter(ctx) + _, ok := GetResultWriter(ctx) if !ok { t.Error("GetResultWriter returned ok == false") } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 2641540..5c620db 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -103,7 +103,7 @@ return res`) // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats(qname string) (*Stats, error) { var op errors.Op = "rdb.CurrentStats" - exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result() + exists, err := r.queueExists(qname) if err != nil { return nil, errors.E(op, errors.Unknown, err) } @@ -270,7 +270,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) { if n < 1 { return nil, errors.E(op, errors.FailedPrecondition, "the number of days must be positive") } - exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result() + exists, err := r.queueExists(qname) if err != nil { return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) } @@ -347,7 +347,7 @@ func reverse(x []string) { // checkQueueExists verifies whether the queue exists. // It returns QueueNotFoundError if queue doesn't exist. func (r *RDB) checkQueueExists(qname string) error { - exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result() + exists, err := r.queueExists(qname) if err != nil { return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) } @@ -462,7 +462,11 @@ func (p Pagination) stop() int64 { // ListPending returns pending tasks that are ready to be processed. func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { var op errors.Op = "rdb.ListPending" - if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() { + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } res, err := r.listMessages(base.PendingKey(qname), qname, pgn) @@ -475,7 +479,11 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er // ListActive returns all tasks that are currently being processed for the given queue. func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { var op errors.Op = "rdb.ListActive" - if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() { + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } res, err := r.listMessages(base.ActiveKey(qname), qname, pgn) @@ -531,7 +539,11 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa // to be processed in the future. func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { var op errors.Op = "rdb.ListScheduled" - if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() { + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) @@ -545,7 +557,11 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { // and willl be retried in the future. func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { var op errors.Op = "rdb.ListRetry" - if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() { + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn) @@ -558,7 +574,11 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { // ListArchived returns all tasks from the given queue that have exhausted its retry limit. func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { var op errors.Op = "rdb.ListArchived" - if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() { + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } zs, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) @@ -568,6 +588,28 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { return zs, nil } +// ListCompleted returns all tasks from the given queue that have completed successfully. +func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) { + var op errors.Op = "rdb.ListCompleted" + exists, err := r.queueExists(qname) + if err != nil { + return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { + return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) + } + zs, err := r.listZSetEntries(base.CompletedKey(qname), qname, pgn) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + return zs, nil +} + +// Reports whether a queue with the given name exists. +func (r *RDB) queueExists(qname string) (bool, error) { + return r.client.SIsMember(context.Background(), base.AllQueues, qname).Result() +} + // KEYS[1] -> key for ids set (e.g. asynq:{}:scheduled) // ARGV[1] -> min // ARGV[2] -> max @@ -1334,7 +1376,7 @@ return 1`) // the queue is empty. func (r *RDB) RemoveQueue(qname string, force bool) error { var op errors.Op = "rdb.RemoveQueue" - exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result() + exists, err := r.queueExists(qname) if err != nil { return err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 2d8b1c0..df864e2 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1115,7 +1115,7 @@ func TestListArchived(t *testing.T) { h.SeedAllArchivedQueues(t, r.client, tc.archived) got, err := r.ListArchived(tc.qname, Pagination{Size: 20, Page: 0}) - op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname) + op := fmt.Sprintf("r.ListArchived(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue @@ -1156,7 +1156,148 @@ func TestListArchivedPagination(t *testing.T) { for _, tc := range tests { got, err := r.ListArchived(tc.qname, Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", + op := fmt.Sprintf("r.ListArchived(Pagination{Size: %d, Page: %d})", + tc.size, tc.page) + if err != nil { + t.Errorf("%s; %s returned error %v", tc.desc, op, err) + continue + } + + if len(got) != tc.wantSize { + t.Errorf("%s; %s returned list of size %d, want %d", + tc.desc, op, len(got), tc.wantSize) + continue + } + + if tc.wantSize == 0 { + continue + } + + first := got[0].Message + if first.Type != tc.wantFirst { + t.Errorf("%s; %s returned a list with first message %q, want %q", + tc.desc, op, first.Type, tc.wantFirst) + } + + last := got[len(got)-1].Message + if last.Type != tc.wantLast { + t.Errorf("%s; %s returned a list with the last message %q, want %q", + tc.desc, op, last.Type, tc.wantLast) + } + } +} + +func TestListCompleted(t *testing.T) { + r := setup(t) + defer r.Close() + msg1 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "foo", + Queue: "default", + CompletedAt: time.Now().Add(-2 * time.Hour).Unix(), + } + msg2 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "foo", + Queue: "default", + CompletedAt: time.Now().Add(-5 * time.Hour).Unix(), + } + msg3 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "foo", + Queue: "custom", + CompletedAt: time.Now().Add(-5 * time.Hour).Unix(), + } + expireAt1 := time.Now().Add(3 * time.Hour) + expireAt2 := time.Now().Add(4 * time.Hour) + expireAt3 := time.Now().Add(5 * time.Hour) + + tests := []struct { + completed map[string][]base.Z + qname string + want []base.Z + }{ + { + completed: map[string][]base.Z{ + "default": { + {Message: msg1, Score: expireAt1.Unix()}, + {Message: msg2, Score: expireAt2.Unix()}, + }, + "custom": { + {Message: msg3, Score: expireAt3.Unix()}, + }, + }, + qname: "default", + want: []base.Z{ + {Message: msg1, Score: expireAt1.Unix()}, + {Message: msg2, Score: expireAt2.Unix()}, + }, + }, + { + completed: map[string][]base.Z{ + "default": { + {Message: msg1, Score: expireAt1.Unix()}, + {Message: msg2, Score: expireAt2.Unix()}, + }, + "custom": { + {Message: msg3, Score: expireAt3.Unix()}, + }, + }, + qname: "custom", + want: []base.Z{ + {Message: msg3, Score: expireAt3.Unix()}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllCompletedQueues(t, r.client, tc.completed) + + got, err := r.ListCompleted(tc.qname, Pagination{Size: 20, Page: 0}) + op := fmt.Sprintf("r.ListCompleted(%q, Pagination{Size: 20, Page: 0})", tc.qname) + if err != nil { + t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) + continue + } + if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", + op, got, err, tc.want, diff) + continue + } + } + +} + +func TestListCompletedPagination(t *testing.T) { + r := setup(t) + defer r.Close() + var entries []base.Z + for i := 0; i < 100; i++ { + msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) + entries = append(entries, base.Z{Message: msg, Score: int64(i)}) + } + h.SeedCompletedQueue(t, r.client, entries, "default") + + tests := []struct { + desc string + qname string + page int + size int + wantSize int + wantFirst string + wantLast string + }{ + {"first page", "default", 0, 20, 20, "task 0", "task 19"}, + {"second page", "default", 1, 20, 20, "task 20", "task 39"}, + {"different page size", "default", 2, 30, 30, "task 60", "task 89"}, + {"last page", "default", 3, 30, 10, "task 90", "task 99"}, + {"out of range", "default", 4, 30, 0, "", ""}, + } + + for _, tc := range tests { + got, err := r.ListCompleted(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListCompleted(Pagination{Size: %d, Page: %d})", tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err)