diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 453b6e6..c45bc68 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -337,7 +337,8 @@ func parseInfo(infoStr string) (map[string]string, error) { return info, nil } -func reverse(x []string) { +// TODO: Use generics once available. +func reverse(x []*base.TaskInfo) { for i := len(x)/2 - 1; i >= 0; i-- { opp := len(x) - 1 - i x[i], x[opp] = x[opp], x[i] @@ -470,7 +471,7 @@ func (p Pagination) stop() int64 { } // ListPending returns pending tasks that are ready to be processed. -func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { +func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListPending" exists, err := r.queueExists(qname) if err != nil { @@ -479,7 +480,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listMessages(base.PendingKey(qname), qname, pgn) + res, err := r.listMessages(qname, base.TaskStatePending, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -487,7 +488,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er } // ListActive returns all tasks that are currently being processed for the given queue. -func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { +func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) { var op errors.Op = "rdb.ListActive" exists, err := r.queueExists(qname) if err != nil { @@ -496,7 +497,7 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err if !exists { return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) } - res, err := r.listMessages(base.ActiveKey(qname), qname, pgn) + res, err := r.listMessages(qname, base.TaskStateActive, pgn) if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } @@ -509,16 +510,27 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err // ARGV[3] -> task key prefix var listMessagesCmd = redis.NewScript(` local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) -local res = {} +local data = {} for _, id in ipairs(ids) do local key = ARGV[3] .. id - table.insert(res, redis.call("HGET", key, "msg")) + local msg, result = unpack(redis.call("HMGET", key, "msg","result")) + table.insert(data, msg) + table.insert(data, result) end -return res +return data `) -// listMessages returns a list of TaskMessage in Redis list with the given key. -func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) { +// listMessages returns a list of TaskInfo in Redis list with the given key. +func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) { + var key string + switch state { + case base.TaskStateActive: + key = base.ActiveKey(qname) + case base.TaskStatePending: + key = base.PendingKey(qname) + default: + panic(fmt.Sprintf("unsupported task state: %v", state)) + } // Note: Because we use LPUSH to redis list, we need to calculate the // correct range and reverse the list to get the tasks with pagination. stop := -pgn.start() - 1 @@ -532,16 +544,25 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa if err != nil { return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) } - reverse(data) - var msgs []*base.TaskMessage - for _, s := range data { - m, err := base.DecodeMessage([]byte(s)) + var infos []*base.TaskInfo + for i := 0; i < len(data); i += 2 { + m, err := base.DecodeMessage([]byte(data[i])) if err != nil { continue // bad data, ignore and continue } - msgs = append(msgs, m) + var res []byte + if len(data[i+1]) > 0 { + res = []byte(data[i+1]) + } + infos = append(infos, &base.TaskInfo{ + Message: m, + State: state, + NextProcessAt: time.Now(), + Result: res, + }) } - return msgs, nil + reverse(infos) + return infos, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 385b74f..a22d3c1 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -544,21 +544,24 @@ func TestListPending(t *testing.T) { tests := []struct { pending map[string][]*base.TaskMessage qname string - want []*base.TaskMessage + want []*base.TaskInfo }{ { pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m1, m2}, }, qname: base.DefaultQueueName, - want: []*base.TaskMessage{m1, m2}, + want: []*base.TaskInfo{ + {Message: m1, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil}, + {Message: m2, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil}, + }, }, { pending: map[string][]*base.TaskMessage{ base.DefaultQueueName: nil, }, qname: base.DefaultQueueName, - want: []*base.TaskMessage(nil), + want: []*base.TaskInfo(nil), }, { pending: map[string][]*base.TaskMessage{ @@ -567,7 +570,10 @@ func TestListPending(t *testing.T) { "low": {m4}, }, qname: base.DefaultQueueName, - want: []*base.TaskMessage{m1, m2}, + want: []*base.TaskInfo{ + {Message: m1, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil}, + {Message: m2, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil}, + }, }, { pending: map[string][]*base.TaskMessage{ @@ -576,7 +582,9 @@ func TestListPending(t *testing.T) { "low": {m4}, }, qname: "critical", - want: []*base.TaskMessage{m3}, + want: []*base.TaskInfo{ + {Message: m3, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil}, + }, }, } @@ -590,7 +598,7 @@ func TestListPending(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got); diff != "" { + if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(2*time.Second)); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } @@ -650,13 +658,13 @@ func TestListPendingPagination(t *testing.T) { continue } - first := got[0] + first := got[0].Message if first.Type != tc.wantFirst { t.Errorf("%s; %s returned a list with first message %q, want %q", tc.desc, op, first.Type, tc.wantFirst) } - last := got[len(got)-1] + last := got[len(got)-1].Message if last.Type != tc.wantLast { t.Errorf("%s; %s returned a list with the last message %q, want %q", tc.desc, op, last.Type, tc.wantLast) @@ -756,13 +764,13 @@ func TestListActivePagination(t *testing.T) { continue } - first := got[0] + first := got[0].Message if first.Type != tc.wantFirst { t.Errorf("%s; %s returned a list with first message %q, want %q", tc.desc, op, first.Type, tc.wantFirst) } - last := got[len(got)-1] + last := got[len(got)-1].Message if last.Type != tc.wantLast { t.Errorf("%s; %s returned a list with the last message %q, want %q", tc.desc, op, last.Type, tc.wantLast)