2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Allow filtering results of asynqmon ls enqueued by providing queue

name
This commit is contained in:
Ken Hibino 2020-01-10 21:32:15 -08:00
parent cf78a12866
commit 2631672575
4 changed files with 101 additions and 22 deletions

View File

@ -6,6 +6,7 @@
package base package base
import ( import (
"strings"
"time" "time"
"github.com/rs/xid" "github.com/rs/xid"
@ -29,7 +30,7 @@ const (
// QueueKey returns a redis key string for the given queue name. // QueueKey returns a redis key string for the given queue name.
func QueueKey(qname string) string { func QueueKey(qname string) string {
return QueuePrefix + qname return QueuePrefix + strings.ToLower(qname)
} }
// ProcessedKey returns a redis key string for procesed count // ProcessedKey returns a redis key string for procesed count

View File

@ -235,8 +235,18 @@ func (r *RDB) RedisInfo() (map[string]string, error) {
return info, nil return info, nil
} }
// ListEnqueued returns all enqueued tasks that are ready to be processed. // ListEnqueued returns enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { //
// Queue names can be optionally passed to query only the specified queues.
// If none are passed, it will query all queues.
func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
if len(qnames) == 0 {
return r.listAllEnqueued()
}
return r.listEnqueued(qnames...)
}
func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) {
script := redis.NewScript(` script := redis.NewScript(`
local res = {} local res = {}
local queues = redis.call("SMEMBERS", KEYS[1]) local queues = redis.call("SMEMBERS", KEYS[1])
@ -256,6 +266,36 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return toEnqueuedTasks(data)
}
func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) {
script := redis.NewScript(`
local res = {}
for _, qkey in ipairs(KEYS) do
local msgs = redis.call("LRANGE", qkey, 0, -1)
for _, msg in ipairs(msgs) do
table.insert(res, msg)
end
end
return res
`)
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
res, err := script.Run(r.client, keys).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
return toEnqueuedTasks(data)
}
func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) {
var tasks []*EnqueuedTask var tasks []*EnqueuedTask
for _, s := range data { for _, s := range data {
var msg base.TaskMessage var msg base.TaskMessage

View File

@ -234,18 +234,21 @@ func TestListEnqueued(t *testing.T) {
t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue} t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue}
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage enqueued map[string][]*base.TaskMessage
qnames []string
want []*EnqueuedTask want []*EnqueuedTask
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2}, base.DefaultQueueName: {m1, m2},
}, },
qnames: []string{},
want: []*EnqueuedTask{t1, t2}, want: []*EnqueuedTask{t1, t2},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {}, base.DefaultQueueName: {},
}, },
qnames: []string{},
want: []*EnqueuedTask{}, want: []*EnqueuedTask{},
}, },
{ {
@ -254,8 +257,27 @@ func TestListEnqueued(t *testing.T) {
"critical": {m3}, "critical": {m3},
"low": {m4}, "low": {m4},
}, },
qnames: []string{},
want: []*EnqueuedTask{t1, t2, t3, t4}, want: []*EnqueuedTask{t1, t2, t3, t4},
}, },
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical"},
want: []*EnqueuedTask{t3},
},
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {m1, m2},
"critical": {m3},
"low": {m4},
},
qnames: []string{"critical", "low"},
want: []*EnqueuedTask{t3, t4},
},
} }
for _, tc := range tests { for _, tc := range tests {
@ -264,7 +286,7 @@ func TestListEnqueued(t *testing.T) {
h.SeedEnqueuedQueue(t, r.client, msgs, qname) h.SeedEnqueuedQueue(t, r.client, msgs, qname)
} }
got, err := r.ListEnqueued() got, err := r.ListEnqueued(tc.qnames...)
if err != nil { if err != nil {
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want) t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want)
continue continue

View File

@ -23,16 +23,21 @@ var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead
// lsCmd represents the ls command // lsCmd represents the ls command
var lsCmd = &cobra.Command{ var lsCmd = &cobra.Command{
Use: "ls [queue name]", Use: "ls [task state]",
Short: "Lists queue contents", Short: "Lists tasks in the specified state",
Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format. Long: `Ls (asynqmon ls) will list all tasks in the specified state in a table format.
The command takes one argument which specifies the queue to inspect. The value The command takes one argument which specifies the state of tasks.
of the argument should be one of "enqueued", "inprogress", "scheduled", The argument value should be one of "enqueued", "inprogress", "scheduled",
"retry", or "dead". "retry", or "dead".
Example: asynqmon ls dead`, Example:
ValidArgs: lsValidArgs, asynqmon ls dead -> Lists all tasks in dead state
Enqueued tasks can optionally be filtered by providing queue names after ":"
Example:
asynqmon ls enqueued:critical -> List tasks from critical queue only
`,
Args: cobra.ExactValidArgs(1), Args: cobra.ExactValidArgs(1),
Run: ls, Run: ls,
} }
@ -57,9 +62,10 @@ func ls(cmd *cobra.Command, args []string) {
DB: db, DB: db,
}) })
r := rdb.NewRDB(c) r := rdb.NewRDB(c)
switch args[0] { parts := strings.Split(args[0], ":")
switch parts[0] {
case "enqueued": case "enqueued":
listEnqueued(r) listEnqueued(r, parts[1:]...)
case "inprogress": case "inprogress":
listInProgress(r) listInProgress(r)
case "scheduled": case "scheduled":
@ -69,7 +75,7 @@ func ls(cmd *cobra.Command, args []string) {
case "dead": case "dead":
listDead(r) listDead(r)
default: default:
fmt.Printf("error: `asynqmon ls [queue name]` only accepts %v as the argument.\n", lsValidArgs) fmt.Printf("error: `asynqmon ls [task state]` only accepts %v as the argument.\n", lsValidArgs)
os.Exit(1) os.Exit(1)
} }
} }
@ -105,14 +111,24 @@ func parseQueryID(queryID string) (id xid.ID, score int64, qtype string, err err
return id, score, qtype, nil return id, score, qtype, nil
} }
func listEnqueued(r *rdb.RDB) { func listEnqueued(r *rdb.RDB, qnames ...string) {
tasks, err := r.ListEnqueued() tasks, err := r.ListEnqueued(qnames...)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
if len(tasks) == 0 { if len(tasks) == 0 {
fmt.Println("No enqueued tasks") msg := "No enqueued tasks"
if len(qnames) > 0 {
msg += " in"
for i, q := range qnames {
msg += fmt.Sprintf(" %q queue", q)
if i != len(qnames)-1 {
msg += ","
}
}
}
fmt.Println(msg)
return return
} }
cols := []string{"ID", "Type", "Payload", "Queue"} cols := []string{"ID", "Type", "Payload", "Queue"}