2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ListPending and RDB.ListActive

This commit is contained in:
Ken Hibino 2021-02-25 08:45:50 -08:00
parent 7c75abe334
commit 127bd9190c
2 changed files with 24 additions and 5 deletions

View File

@ -300,7 +300,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.PendingKey(qname), pgn)
return r.listMessages(base.PendingKey(qname), qname, pgn)
}
// ListActive returns all tasks that are currently being processed for the given queue.
@ -308,16 +308,35 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.ActiveKey(qname), pgn)
return r.listMessages(base.ActiveKey(qname), qname, pgn)
}
// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)
// ARGV[1] -> start offset
// ARGV[2] -> stop offset
// ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {}
for _, id in ipairs(ids) do
local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(key, start, stop).Result()
res, err := listMessagesCmd.Run(r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}

View File

@ -386,7 +386,7 @@ func TestListPendingPagination(t *testing.T) {
msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil)
msg := h.NewTaskMessageWithQueue(fmt.Sprintf("custom %d", i), nil, "custom")
msgs = append(msgs, msg)
}
// create 100 tasks in custom queue