Paginate tasks with asynqmon ls command

Changes:
* Added --page and --size flags to ls command
* By default, the command will show first 30 tasks from the specified
queue
This commit is contained in:
Ken Hibino
2020-01-24 07:19:58 -08:00
parent 3ed155b45b
commit 31123fd42a
4 changed files with 397 additions and 129 deletions

View File

@@ -235,67 +235,46 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
return info, nil
}
func reverse(x []string) {
for i := len(x)/2 - 1; i >= 0; i-- {
opp := len(x) - 1 - i
x[i], x[opp] = x[opp], x[i]
}
}
// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {
// Number of items in the page.
Size uint
// Page number starting from zero.
Page uint
}
func (p Pagination) start() int64 {
return int64(p.Size * p.Page)
}
func (p Pagination) stop() int64 {
return int64(p.Size*p.Page + p.Size - 1)
}
// ListEnqueued returns enqueued tasks that are ready to be processed.
//
// Queue names can be optionally passed to query only the specified queues.
// If none are passed, it will query all queues.
func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
if len(qnames) == 0 {
return r.listAllEnqueued()
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listEnqueued(qnames...)
}
func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
local queues = redis.call("SMEMBERS", KEYS[1])
for _, qkey in ipairs(queues) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
res, err := script.Run(r.client, []string{base.AllQueues}).Result()
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
for _, qkey in ipairs(KEYS) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
res, err := script.Run(r.client, keys).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
reverse(data)
var tasks []*EnqueuedTask
for _, s := range data {
var msg base.TaskMessage
@@ -314,11 +293,16 @@ func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
}
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result()
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*InProgressTask
for _, s := range data {
var msg base.TaskMessage
@@ -337,8 +321,8 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result()
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -368,8 +352,8 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result()
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
@@ -401,8 +385,8 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result()
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}