2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ListScheduled, RDB.ListRetry, RDB.ListArchived

This commit is contained in:
Ken Hibino 2021-02-25 22:05:51 -08:00
parent 127bd9190c
commit 6437ea4a82
2 changed files with 41 additions and 14 deletions

View File

@ -359,7 +359,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, fmt.Errorf("queue %q does not exist", qname)
} }
return r.listZSetEntries(base.ScheduledKey(qname), pgn) return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
} }
// ListRetry returns all tasks from the given queue that have failed before // ListRetry returns all tasks from the given queue that have failed before
@ -368,7 +368,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, fmt.Errorf("queue %q does not exist", qname)
} }
return r.listZSetEntries(base.RetryKey(qname), pgn) return r.listZSetEntries(base.RetryKey(qname), qname, pgn)
} }
// ListArchived returns all tasks from the given queue that have exhausted its retry limit. // ListArchived returns all tasks from the given queue that have exhausted its retry limit.
@ -376,29 +376,56 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() { if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, fmt.Errorf("queue %q does not exist", qname)
} }
return r.listZSetEntries(base.ArchivedKey(qname), pgn) return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
} }
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
// ARGV[1] -> min
// ARGV[2] -> max
// ARGV[3] -> task key prefix
//
// Returns an array populated with
// [msg1, score1, msg2, score2, ..., msgN, scoreN]
var listZSetEntriesCmd = redis.NewScript(`
local res = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
end
return res
`)
// listZSetEntries returns a list of message and score pairs in Redis sorted-set // listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key. // with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) { func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result() res, err := listZSetEntriesCmd.Run(r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var res []base.Z data, err := cast.ToSliceE(res)
for _, z := range data { if err != nil {
s, ok := z.Member.(string) return nil, err
if !ok { }
continue // bad data, ignore and continue var zs []base.Z
for i := 0; i < len(data); i += 2 {
s, err := cast.ToStringE(data[i])
if err != nil {
return nil, err
}
score, err := cast.ToInt64E(data[i+1])
if err != nil {
return nil, err
} }
msg, err := base.DecodeMessage(s) msg, err := base.DecodeMessage(s)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
res = append(res, base.Z{Message: msg, Score: int64(z.Score)}) zs = append(zs, base.Z{Message: msg, Score: score})
} }
return res, nil return zs, nil
} }
// RunArchivedTask finds an archived task that matches the given id and score from // RunArchivedTask finds an archived task that matches the given id and score from

View File

@ -841,7 +841,7 @@ func TestListRetryPagination(t *testing.T) {
} }
} }
func TestListDead(t *testing.T) { func TestListArchived(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
@ -932,7 +932,7 @@ func TestListDead(t *testing.T) {
} }
} }
func TestListDeadPagination(t *testing.T) { func TestListArchivedPagination(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
var entries []base.Z var entries []base.Z