Add ListCompletedTasks to Inspector

This commit is contained in:
Ken Hibino
2021-09-23 17:02:21 -07:00
parent f06404a9a8
commit b8b23e8b57
6 changed files with 323 additions and 30 deletions

View File

@@ -108,7 +108,7 @@ func TestGetResultWriterFromContext(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
const deadline = time.Now().Add(30 * time.Minute)
deadline := time.Now().Add(30 * time.Minute)
tests := []struct {
msg *base.TaskMessage
@@ -120,7 +120,7 @@ func TestGetResultWriterFromContext(t *testing.T) {
ctx, cancel := createContext(tc.msg, deadline, rdbClient)
defer cancel()
w, ok := GetResultWriter(ctx)
_, ok := GetResultWriter(ctx)
if !ok {
t.Error("GetResultWriter returned ok == false")
}

View File

@@ -103,7 +103,7 @@ return res`)
// CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats(qname string) (*Stats, error) {
var op errors.Op = "rdb.CurrentStats"
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
@@ -270,7 +270,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
if n < 1 {
return nil, errors.E(op, errors.FailedPrecondition, "the number of days must be positive")
}
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
@@ -347,7 +347,7 @@ func reverse(x []string) {
// checkQueueExists verifies whether the queue exists.
// It returns QueueNotFoundError if queue doesn't exist.
func (r *RDB) checkQueueExists(qname string) error {
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
exists, err := r.queueExists(qname)
if err != nil {
return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
@@ -462,7 +462,11 @@ func (p Pagination) stop() int64 {
// ListPending returns pending tasks that are ready to be processed.
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListPending"
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(base.PendingKey(qname), qname, pgn)
@@ -475,7 +479,11 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
// ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListActive"
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listMessages(base.ActiveKey(qname), qname, pgn)
@@ -531,7 +539,11 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
// to be processed in the future.
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListScheduled"
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
@@ -545,7 +557,11 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
// and willl be retried in the future.
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListRetry"
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn)
@@ -558,7 +574,11 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListArchived"
if !r.client.SIsMember(context.Background(), base.AllQueues, qname).Val() {
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
zs, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
@@ -568,6 +588,28 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
return zs, nil
}
// ListCompleted returns all tasks from the given queue that have completed successfully.
func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) {
var op errors.Op = "rdb.ListCompleted"
exists, err := r.queueExists(qname)
if err != nil {
return nil, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
}
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
zs, err := r.listZSetEntries(base.CompletedKey(qname), qname, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
return zs, nil
}
// Reports whether a queue with the given name exists.
func (r *RDB) queueExists(qname string) (bool, error) {
return r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
}
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
// ARGV[1] -> min
// ARGV[2] -> max
@@ -1334,7 +1376,7 @@ return 1`)
// the queue is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error {
var op errors.Op = "rdb.RemoveQueue"
exists, err := r.client.SIsMember(context.Background(), base.AllQueues, qname).Result()
exists, err := r.queueExists(qname)
if err != nil {
return err
}

View File

@@ -1115,7 +1115,7 @@ func TestListArchived(t *testing.T) {
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got, err := r.ListArchived(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname)
op := fmt.Sprintf("r.ListArchived(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
@@ -1156,7 +1156,148 @@ func TestListArchivedPagination(t *testing.T) {
for _, tc := range tests {
got, err := r.ListArchived(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})",
op := fmt.Sprintf("r.ListArchived(Pagination{Size: %d, Page: %d})",
tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
}
if len(got) != tc.wantSize {
t.Errorf("%s; %s returned list of size %d, want %d",
tc.desc, op, len(got), tc.wantSize)
continue
}
if tc.wantSize == 0 {
continue
}
first := got[0].Message
if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst)
}
last := got[len(got)-1].Message
if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast)
}
}
}
func TestListCompleted(t *testing.T) {
r := setup(t)
defer r.Close()
msg1 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "foo",
Queue: "default",
CompletedAt: time.Now().Add(-2 * time.Hour).Unix(),
}
msg2 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "foo",
Queue: "default",
CompletedAt: time.Now().Add(-5 * time.Hour).Unix(),
}
msg3 := &base.TaskMessage{
ID: uuid.NewString(),
Type: "foo",
Queue: "custom",
CompletedAt: time.Now().Add(-5 * time.Hour).Unix(),
}
expireAt1 := time.Now().Add(3 * time.Hour)
expireAt2 := time.Now().Add(4 * time.Hour)
expireAt3 := time.Now().Add(5 * time.Hour)
tests := []struct {
completed map[string][]base.Z
qname string
want []base.Z
}{
{
completed: map[string][]base.Z{
"default": {
{Message: msg1, Score: expireAt1.Unix()},
{Message: msg2, Score: expireAt2.Unix()},
},
"custom": {
{Message: msg3, Score: expireAt3.Unix()},
},
},
qname: "default",
want: []base.Z{
{Message: msg1, Score: expireAt1.Unix()},
{Message: msg2, Score: expireAt2.Unix()},
},
},
{
completed: map[string][]base.Z{
"default": {
{Message: msg1, Score: expireAt1.Unix()},
{Message: msg2, Score: expireAt2.Unix()},
},
"custom": {
{Message: msg3, Score: expireAt3.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: msg3, Score: expireAt3.Unix()},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllCompletedQueues(t, r.client, tc.completed)
got, err := r.ListCompleted(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListCompleted(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue
}
}
}
func TestListCompletedPagination(t *testing.T) {
r := setup(t)
defer r.Close()
var entries []base.Z
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
entries = append(entries, base.Z{Message: msg, Score: int64(i)})
}
h.SeedCompletedQueue(t, r.client, entries, "default")
tests := []struct {
desc string
qname string
page int
size int
wantSize int
wantFirst string
wantLast string
}{
{"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", "default", 4, 30, 0, "", ""},
}
for _, tc := range tests {
got, err := r.ListCompleted(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListCompleted(Pagination{Size: %d, Page: %d})",
tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)