diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c45bc68..e3f6144 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -554,10 +554,14 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ( if len(data[i+1]) > 0 { res = []byte(data[i+1]) } + var nextProcessAt time.Time + if state == base.TaskStatePending { + nextProcessAt = time.Now() + } infos = append(infos, &base.TaskInfo{ Message: m, State: state, - NextProcessAt: time.Now(), + NextProcessAt: nextProcessAt, Result: res, }) } @@ -568,7 +572,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ( // ListScheduled returns all tasks from the given queue that are scheduled // to be processed in the future. -func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListScheduled" exists, err := r.queueExists(qname) if err != nil { @@ -577,7 +581,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn) + res, err := r.listZSetEntries(qname, base.TaskStateScheduled, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -586,7 +590,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { // ListRetry returns all tasks from the given queue that have failed before // and willl be retried in the future. -func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListRetry" exists, err := r.queueExists(qname) if err != nil { @@ -595,7 +599,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn) + res, err := r.listZSetEntries(qname, base.TaskStateRetry, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -603,7 +607,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { } // ListArchived returns all tasks from the given queue that have exhausted its retry limit. -func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListArchived" exists, err := r.queueExists(qname) if err != nil { @@ -612,7 +616,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - zs, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn) + zs, err := r.listZSetEntries(qname, base.TaskStateArchived, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -620,7 +624,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { } // ListCompleted returns all tasks from the given queue that have completed successfully. -func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListCompleted" exists, err := r.queueExists(qname) if err != nil { @@ -629,7 +633,7 @@ func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) { if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - zs, err := r.listZSetEntries(base.CompletedKey(qname), qname, pgn) + zs, err := r.listZSetEntries(qname, base.TaskStateCompleted, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -647,21 +651,38 @@ func (r *RDB) queueExists(qname string) (bool, error) { // ARGV[3] -> task key prefix // // Returns an array populated with -// [msg1, score1, msg2, score2, ..., msgN, scoreN] +// [msg1, score1, result1, msg2, score2, result2, ..., msgN, scoreN, resultN] var listZSetEntriesCmd = redis.NewScript(` -local res = {} +local data = {} local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES") for i = 1, table.getn(id_score_pairs), 2 do - local key = ARGV[3] .. id_score_pairs[i] - table.insert(res, redis.call("HGET", key, "msg")) - table.insert(res, id_score_pairs[i+1]) + local id = id_score_pairs[i] + local score = id_score_pairs[i+1] + local key = ARGV[3] .. id + local msg, res = unpack(redis.call("HMGET", key, "msg", "result")) + table.insert(data, msg) + table.insert(data, score) + table.insert(data, res) end -return res +return data `) // listZSetEntries returns a list of message and score pairs in Redis sorted-set // with the given key. -func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) listZSetEntries(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) { + var key string + switch state { + case base.TaskStateScheduled: + key = base.ScheduledKey(qname) + case base.TaskStateRetry: + key = base.RetryKey(qname) + case base.TaskStateArchived: + key = base.ArchivedKey(qname) + case base.TaskStateCompleted: + key = base.CompletedKey(qname) + default: + panic(fmt.Sprintf("unsupported task state: %v", state)) + } res, err := listZSetEntriesCmd.Run(context.Background(), r.client, []string{key}, pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result() if err != nil { @@ -671,8 +692,8 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro if err != nil { return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } - var zs []base.Z - for i := 0; i < len(data); i += 2 { + var infos []*base.TaskInfo + for i := 0; i < len(data); i += 3 { s, err := cast.ToStringE(data[i]) if err != nil { return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) @@ -681,13 +702,30 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro if err != nil { return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } + resStr, err := cast.ToStringE(data[i+2]) + if err != nil { + return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) + } msg, err := base.DecodeMessage([]byte(s)) if err != nil { continue // bad data, ignore and continue } - zs = append(zs, base.Z{Message: msg, Score: score}) + var nextProcessAt time.Time + if state == base.TaskStateScheduled || state == base.TaskStateRetry { + nextProcessAt = time.Unix(score, 0) + } + var resBytes []byte + if len(resStr) > 0 { + resBytes = []byte(resStr) + } + infos = append(infos, &base.TaskInfo{ + Message: msg, + State: state, + NextProcessAt: nextProcessAt, + Result: resBytes, + }) } - return zs, nil + return infos, nil } // RunAllScheduledTasks enqueues all scheduled tasks from the given queue diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index a22d3c1..207709d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -684,7 +684,7 @@ func TestListActive(t *testing.T) { tests := []struct { inProgress map[string][]*base.TaskMessage qname string - want []*base.TaskMessage + want []*base.TaskInfo }{ { inProgress: map[string][]*base.TaskMessage{ @@ -693,14 +693,17 @@ func TestListActive(t *testing.T) { "low": {m4}, }, qname: "default", - want: []*base.TaskMessage{m1, m2}, + want: []*base.TaskInfo{ + {Message: m1, State: base.TaskStateActive, NextProcessAt: time.Time{}, Result: nil}, + {Message: m2, State: base.TaskStateActive, NextProcessAt: time.Time{}, Result: nil}, + }, }, { inProgress: map[string][]*base.TaskMessage{ "default": {}, }, qname: "default", - want: []*base.TaskMessage(nil), + want: []*base.TaskInfo(nil), }, } @@ -714,7 +717,7 @@ func TestListActive(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress) continue } - if diff := cmp.Diff(tc.want, got); diff != "" { + if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } @@ -793,7 +796,7 @@ func TestListScheduled(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { scheduled: map[string][]base.Z{ @@ -808,10 +811,10 @@ func TestListScheduled(t *testing.T) { }, qname: "default", // should be sorted by score in ascending order - want: []base.Z{ - {Message: m3, Score: p3.Unix()}, - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, + want: []*base.TaskInfo{ + {Message: m3, NextProcessAt: p3, State: base.TaskStateScheduled, Result: nil}, + {Message: m1, NextProcessAt: p1, State: base.TaskStateScheduled, Result: nil}, + {Message: m2, NextProcessAt: p2, State: base.TaskStateScheduled, Result: nil}, }, }, { @@ -826,8 +829,8 @@ func TestListScheduled(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m4, Score: p4.Unix()}, + want: []*base.TaskInfo{ + {Message: m4, NextProcessAt: p4, State: base.TaskStateScheduled, Result: nil}, }, }, { @@ -835,7 +838,7 @@ func TestListScheduled(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -849,7 +852,7 @@ func TestListScheduled(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } @@ -951,7 +954,7 @@ func TestListRetry(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { retry: map[string][]base.Z{ @@ -964,9 +967,9 @@ func TestListRetry(t *testing.T) { }, }, qname: "default", - want: []base.Z{ - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, + want: []*base.TaskInfo{ + {Message: m1, NextProcessAt: p1, State: base.TaskStateRetry, Result: nil}, + {Message: m2, NextProcessAt: p2, State: base.TaskStateRetry, Result: nil}, }, }, { @@ -980,8 +983,8 @@ func TestListRetry(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m3, Score: p3.Unix()}, + want: []*base.TaskInfo{ + {Message: m3, NextProcessAt: p3, State: base.TaskStateRetry, Result: nil}, }, }, { @@ -989,7 +992,7 @@ func TestListRetry(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -1003,7 +1006,7 @@ func TestListRetry(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -1104,7 +1107,7 @@ func TestListArchived(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { archived: map[string][]base.Z{ @@ -1117,9 +1120,9 @@ func TestListArchived(t *testing.T) { }, }, qname: "default", - want: []base.Z{ - {Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order? - {Message: m1, Score: f1.Unix()}, + want: []*base.TaskInfo{ + {Message: m2, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil}, // FIXME: shouldn't be sorted in the other order? + {Message: m1, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil}, }, }, { @@ -1133,8 +1136,8 @@ func TestListArchived(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m3, Score: f3.Unix()}, + want: []*base.TaskInfo{ + {Message: m3, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil}, }, }, { @@ -1142,7 +1145,7 @@ func TestListArchived(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -1156,7 +1159,7 @@ func TestListArchived(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -1251,7 +1254,7 @@ func TestListCompleted(t *testing.T) { tests := []struct { completed map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { completed: map[string][]base.Z{ @@ -1264,9 +1267,9 @@ func TestListCompleted(t *testing.T) { }, }, qname: "default", - want: []base.Z{ - {Message: msg1, Score: expireAt1.Unix()}, - {Message: msg2, Score: expireAt2.Unix()}, + want: []*base.TaskInfo{ + {Message: msg1, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil}, + {Message: msg2, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil}, }, }, { @@ -1280,8 +1283,8 @@ func TestListCompleted(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: msg3, Score: expireAt3.Unix()}, + want: []*base.TaskInfo{ + {Message: msg3, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil}, }, }, } @@ -1296,7 +1299,7 @@ func TestListCompleted(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue