2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

(rdb): Update ListScheduled, ListRetry, ListArchived, and ListCompleted

to return a list of TaskInfo
This commit is contained in:
Ken Hibino 2021-09-24 14:42:35 -07:00
parent 45eb0e08c7
commit 88457c7a35
2 changed files with 97 additions and 56 deletions

View File

@ -554,10 +554,14 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
if len(data[i+1]) > 0 {
res = []byte(data[i+1])
}
var nextProcessAt time.Time
if state == base.TaskStatePending {
nextProcessAt = time.Now()
}
infos = append(infos, &base.TaskInfo{
Message: m,
State: state,
NextProcessAt: time.Now(),
NextProcessAt: nextProcessAt,
Result: res,
})
}
@ -568,7 +572,7 @@ func (r *RDB) listMessages(qname string, state base.TaskState, pgn Pagination) (
// ListScheduled returns all tasks from the given queue that are scheduled
// to be processed in the future.
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListScheduled"
exists, err := r.queueExists(qname)
if err != nil {
@ -577,7 +581,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
res, err := r.listZSetEntries(qname, base.TaskStateScheduled, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
@ -586,7 +590,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
// ListRetry returns all tasks from the given queue that have failed before
// and willl be retried in the future.
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListRetry"
exists, err := r.queueExists(qname)
if err != nil {
@ -595,7 +599,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
res, err := r.listZSetEntries(base.RetryKey(qname), qname, pgn)
res, err := r.listZSetEntries(qname, base.TaskStateRetry, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
@ -603,7 +607,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
}
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListArchived"
exists, err := r.queueExists(qname)
if err != nil {
@ -612,7 +616,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
zs, err := r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
zs, err := r.listZSetEntries(qname, base.TaskStateArchived, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
@ -620,7 +624,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
}
// ListCompleted returns all tasks from the given queue that have completed successfully.
func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) {
func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]*base.TaskInfo, error) {
var op errors.Op = "rdb.ListCompleted"
exists, err := r.queueExists(qname)
if err != nil {
@ -629,7 +633,7 @@ func (r *RDB) ListCompleted(qname string, pgn Pagination) ([]base.Z, error) {
if !exists {
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
}
zs, err := r.listZSetEntries(base.CompletedKey(qname), qname, pgn)
zs, err := r.listZSetEntries(qname, base.TaskStateCompleted, pgn)
if err != nil {
return nil, errors.E(op, errors.CanonicalCode(err), err)
}
@ -647,21 +651,38 @@ func (r *RDB) queueExists(qname string) (bool, error) {
// ARGV[3] -> task key prefix
//
// Returns an array populated with
// [msg1, score1, msg2, score2, ..., msgN, scoreN]
// [msg1, score1, result1, msg2, score2, result2, ..., msgN, scoreN, resultN]
var listZSetEntriesCmd = redis.NewScript(`
local res = {}
local data = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
local id = id_score_pairs[i]
local score = id_score_pairs[i+1]
local key = ARGV[3] .. id
local msg, res = unpack(redis.call("HMGET", key, "msg", "result"))
table.insert(data, msg)
table.insert(data, score)
table.insert(data, res)
end
return res
return data
`)
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) {
func (r *RDB) listZSetEntries(qname string, state base.TaskState, pgn Pagination) ([]*base.TaskInfo, error) {
var key string
switch state {
case base.TaskStateScheduled:
key = base.ScheduledKey(qname)
case base.TaskStateRetry:
key = base.RetryKey(qname)
case base.TaskStateArchived:
key = base.ArchivedKey(qname)
case base.TaskStateCompleted:
key = base.CompletedKey(qname)
default:
panic(fmt.Sprintf("unsupported task state: %v", state))
}
res, err := listZSetEntriesCmd.Run(context.Background(), r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
@ -671,8 +692,8 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
if err != nil {
return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
}
var zs []base.Z
for i := 0; i < len(data); i += 2 {
var infos []*base.TaskInfo
for i := 0; i < len(data); i += 3 {
s, err := cast.ToStringE(data[i])
if err != nil {
return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
@ -681,13 +702,30 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
if err != nil {
return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
}
resStr, err := cast.ToStringE(data[i+2])
if err != nil {
return nil, errors.E(errors.Internal, fmt.Errorf("cast error: Lua script returned unexpected value: %v", res))
}
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
zs = append(zs, base.Z{Message: msg, Score: score})
var nextProcessAt time.Time
if state == base.TaskStateScheduled || state == base.TaskStateRetry {
nextProcessAt = time.Unix(score, 0)
}
var resBytes []byte
if len(resStr) > 0 {
resBytes = []byte(resStr)
}
infos = append(infos, &base.TaskInfo{
Message: msg,
State: state,
NextProcessAt: nextProcessAt,
Result: resBytes,
})
}
return zs, nil
return infos, nil
}
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue

View File

@ -684,7 +684,7 @@ func TestListActive(t *testing.T) {
tests := []struct {
inProgress map[string][]*base.TaskMessage
qname string
want []*base.TaskMessage
want []*base.TaskInfo
}{
{
inProgress: map[string][]*base.TaskMessage{
@ -693,14 +693,17 @@ func TestListActive(t *testing.T) {
"low": {m4},
},
qname: "default",
want: []*base.TaskMessage{m1, m2},
want: []*base.TaskInfo{
{Message: m1, State: base.TaskStateActive, NextProcessAt: time.Time{}, Result: nil},
{Message: m2, State: base.TaskStateActive, NextProcessAt: time.Time{}, Result: nil},
},
},
{
inProgress: map[string][]*base.TaskMessage{
"default": {},
},
qname: "default",
want: []*base.TaskMessage(nil),
want: []*base.TaskInfo(nil),
},
}
@ -714,7 +717,7 @@ func TestListActive(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress)
continue
}
if diff := cmp.Diff(tc.want, got); diff != "" {
if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
@ -793,7 +796,7 @@ func TestListScheduled(t *testing.T) {
tests := []struct {
scheduled map[string][]base.Z
qname string
want []base.Z
want []*base.TaskInfo
}{
{
scheduled: map[string][]base.Z{
@ -808,10 +811,10 @@ func TestListScheduled(t *testing.T) {
},
qname: "default",
// should be sorted by score in ascending order
want: []base.Z{
{Message: m3, Score: p3.Unix()},
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
want: []*base.TaskInfo{
{Message: m3, NextProcessAt: p3, State: base.TaskStateScheduled, Result: nil},
{Message: m1, NextProcessAt: p1, State: base.TaskStateScheduled, Result: nil},
{Message: m2, NextProcessAt: p2, State: base.TaskStateScheduled, Result: nil},
},
},
{
@ -826,8 +829,8 @@ func TestListScheduled(t *testing.T) {
},
},
qname: "custom",
want: []base.Z{
{Message: m4, Score: p4.Unix()},
want: []*base.TaskInfo{
{Message: m4, NextProcessAt: p4, State: base.TaskStateScheduled, Result: nil},
},
},
{
@ -835,7 +838,7 @@ func TestListScheduled(t *testing.T) {
"default": {},
},
qname: "default",
want: []base.Z(nil),
want: []*base.TaskInfo(nil),
},
}
@ -849,7 +852,7 @@ func TestListScheduled(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue
}
@ -951,7 +954,7 @@ func TestListRetry(t *testing.T) {
tests := []struct {
retry map[string][]base.Z
qname string
want []base.Z
want []*base.TaskInfo
}{
{
retry: map[string][]base.Z{
@ -964,9 +967,9 @@ func TestListRetry(t *testing.T) {
},
},
qname: "default",
want: []base.Z{
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
want: []*base.TaskInfo{
{Message: m1, NextProcessAt: p1, State: base.TaskStateRetry, Result: nil},
{Message: m2, NextProcessAt: p2, State: base.TaskStateRetry, Result: nil},
},
},
{
@ -980,8 +983,8 @@ func TestListRetry(t *testing.T) {
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: p3.Unix()},
want: []*base.TaskInfo{
{Message: m3, NextProcessAt: p3, State: base.TaskStateRetry, Result: nil},
},
},
{
@ -989,7 +992,7 @@ func TestListRetry(t *testing.T) {
"default": {},
},
qname: "default",
want: []base.Z(nil),
want: []*base.TaskInfo(nil),
},
}
@ -1003,7 +1006,7 @@ func TestListRetry(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(1*time.Second)); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue
@ -1104,7 +1107,7 @@ func TestListArchived(t *testing.T) {
tests := []struct {
archived map[string][]base.Z
qname string
want []base.Z
want []*base.TaskInfo
}{
{
archived: map[string][]base.Z{
@ -1117,9 +1120,9 @@ func TestListArchived(t *testing.T) {
},
},
qname: "default",
want: []base.Z{
{Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order?
{Message: m1, Score: f1.Unix()},
want: []*base.TaskInfo{
{Message: m2, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil}, // FIXME: shouldn't be sorted in the other order?
{Message: m1, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil},
},
},
{
@ -1133,8 +1136,8 @@ func TestListArchived(t *testing.T) {
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: f3.Unix()},
want: []*base.TaskInfo{
{Message: m3, NextProcessAt: time.Time{}, State: base.TaskStateArchived, Result: nil},
},
},
{
@ -1142,7 +1145,7 @@ func TestListArchived(t *testing.T) {
"default": {},
},
qname: "default",
want: []base.Z(nil),
want: []*base.TaskInfo(nil),
},
}
@ -1156,7 +1159,7 @@ func TestListArchived(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue
@ -1251,7 +1254,7 @@ func TestListCompleted(t *testing.T) {
tests := []struct {
completed map[string][]base.Z
qname string
want []base.Z
want []*base.TaskInfo
}{
{
completed: map[string][]base.Z{
@ -1264,9 +1267,9 @@ func TestListCompleted(t *testing.T) {
},
},
qname: "default",
want: []base.Z{
{Message: msg1, Score: expireAt1.Unix()},
{Message: msg2, Score: expireAt2.Unix()},
want: []*base.TaskInfo{
{Message: msg1, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil},
{Message: msg2, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil},
},
},
{
@ -1280,8 +1283,8 @@ func TestListCompleted(t *testing.T) {
},
},
qname: "custom",
want: []base.Z{
{Message: msg3, Score: expireAt3.Unix()},
want: []*base.TaskInfo{
{Message: msg3, NextProcessAt: time.Time{}, State: base.TaskStateCompleted, Result: nil},
},
},
}
@ -1296,7 +1299,7 @@ func TestListCompleted(t *testing.T) {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" {
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s",
op, got, err, tc.want, diff)
continue