2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

(rdb): Update ListPending and ListActive to return TaskInfo

This commit is contained in:
Ken Hibino 2021-09-24 13:54:37 -07:00
parent e8f140539d
commit 45eb0e08c7
2 changed files with 55 additions and 26 deletions

View File

@ -337,7 +337,8 @@ func parseInfo(infoStr string) (map[string]string, error) {
return info, nil return info, nil
} }
func reverse(x []string) { // TODO: Use generics once available.
func reverse(x []*base.TaskInfo) {
for i := len(x)/2 - 1; i >= 0; i-- { for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i] x[i], x[opp] = x[opp], x[i]
@ -470,7 +471,7 @@ func (p Pagination) stop() int64 {
} }
// ListPending returns pending tasks that are ready to be processed. // ListPending returns pending tasks that are ready to be processed.
func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListPending" var op errors.Op = "rdb.ListPending"
exists, err := r.queueExists(qname) exists, err := r.queueExists(qname)
if err != nil { if err != nil {
@ -479,7 +480,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
if !exists { if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
res, err := r.listMessages(base.PendingKey(qname), qname, pgn) res, err := r.listMessages(qname, base.TaskStatePending, pgn)
if err != nil { if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err) return nil, errors.E(op, errors.CanonicalCode(err), err)
} }
@ -487,7 +488,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
} }
// ListActive returns all tasks that are currently being processed for the given queue. // ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListActive" var op errors.Op = "rdb.ListActive"
exists, err := r.queueExists(qname) exists, err := r.queueExists(qname)
if err != nil { if err != nil {
@ -496,7 +497,7 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
if !exists { if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname}) return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
} }
res, err := r.listMessages(base.ActiveKey(qname), qname, pgn) res, err := r.listMessages(qname, base.TaskStateActive, pgn)
if err != nil { if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err) return nil, errors.E(op, errors.CanonicalCode(err), err)
} }
@ -509,16 +510,27 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
// ARGV[3] -> task key prefix // ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(` var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {} local data = {}
for _, id in ipairs(ids) do for _, id in ipairs(ids) do
local key = ARGV[3] .. id local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg")) local msg, result = unpack(redis.call("HMGET", key, "msg","result"))
table.insert(data, msg)
table.insert(data, result)
end end
return res return data
`) `)
// listMessages returns a list of TaskMessage in Redis list with the given key. // listMessages returns a list of TaskInfo in Redis list with the given key.
func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) {
var key string
switch state {
case base.TaskStateActive:
key = base.ActiveKey(qname)
case base.TaskStatePending:
key = base.PendingKey(qname)
default:
panic(fmt.Sprintf("unsupported task state: %v", state))
}
// Note: Because we use LPUSH to redis list, we need to calculate the // Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination. // correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1 stop := -pgn.start() - 1
@ -532,16 +544,25 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
if err != nil { if err != nil {
return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res)) return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
} }
reverse(data) var infos []*base.TaskInfo
var msgs []*base.TaskMessage for i := 0; i < len(data); i += 2 {
for _, s := range data { m, err := base.DecodeMessage([]byte(data[i]))
m, err := base.DecodeMessage([]byte(s))
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
msgs = append(msgs, m) var res []byte
if len(data[i+1]) > 0 {
res = []byte(data[i+1])
} }
return msgs, nil infos = append(infos, &base.TaskInfo{
Message: m,
State: state,
NextProcessAt: time.Now(),
Result: res,
})
}
reverse(infos)
return infos, nil
} }

View File

@ -544,21 +544,24 @@ func TestListPending(t *testing.T) {
tests := []struct { tests := []struct {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
qname string qname string
want []*base.TaskMessage want []*base.TaskInfo
}{ }{
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2}, base.DefaultQueueName: {m1, m2},
}, },
qname: base.DefaultQueueName, qname: base.DefaultQueueName,
want: []*base.TaskMessage{m1, m2}, want: []*base.TaskInfo{
{Message: m1, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil},
{Message: m2, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil},
},
}, },
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
base.DefaultQueueName: nil, base.DefaultQueueName: nil,
}, },
qname: base.DefaultQueueName, qname: base.DefaultQueueName,
want: []*base.TaskMessage(nil), want: []*base.TaskInfo(nil),
}, },
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
@ -567,7 +570,10 @@ func TestListPending(t *testing.T) {
"low": {m4}, "low": {m4},
}, },
qname: base.DefaultQueueName, qname: base.DefaultQueueName,
want: []*base.TaskMessage{m1, m2}, want: []*base.TaskInfo{
{Message: m1, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil},
{Message: m2, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil},
},
}, },
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
@ -576,7 +582,9 @@ func TestListPending(t *testing.T) {
"low": {m4}, "low": {m4},
}, },
qname: "critical", qname: "critical",
want: []*base.TaskMessage{m3}, want: []*base.TaskInfo{
{Message: m3, State: base.TaskStatePending, NextProcessAt: time.Now(), Result: nil},
},
}, },
} }
@ -590,7 +598,7 @@ func TestListPending(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
} }
if diff := cmp.Diff(tc.want, got); diff != "" { if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(2*time.Second)); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue continue
} }
@ -650,13 +658,13 @@ func TestListPendingPagination(t *testing.T) {
continue continue
} }
first := got[0] first := got[0].Message
if first.Type != tc.wantFirst { if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q", t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst) tc.desc, op, first.Type, tc.wantFirst)
} }
last := got[len(got)-1] last := got[len(got)-1].Message
if last.Type != tc.wantLast { if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q", t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast) tc.desc, op, last.Type, tc.wantLast)
@ -756,13 +764,13 @@ func TestListActivePagination(t *testing.T) {
continue continue
} }
first := got[0] first := got[0].Message
if first.Type != tc.wantFirst { if first.Type != tc.wantFirst {
t.Errorf("%s; %s returned a list with first message %q, want %q", t.Errorf("%s; %s returned a list with first message %q, want %q",
tc.desc, op, first.Type, tc.wantFirst) tc.desc, op, first.Type, tc.wantFirst)
} }
last := got[len(got)-1] last := got[len(got)-1].Message
if last.Type != tc.wantLast { if last.Type != tc.wantLast {
t.Errorf("%s; %s returned a list with the last message %q, want %q", t.Errorf("%s; %s returned a list with the last message %q, want %q",
tc.desc, op, last.Type, tc.wantLast) tc.desc, op, last.Type, tc.wantLast)