diff --git a/internal/base/base.go b/internal/base/base.go index 2fae65c..9735b68 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,6 +6,7 @@ package base import ( + "strings" "time" "github.com/rs/xid" @@ -29,7 +30,7 @@ const ( // QueueKey returns a redis key string for the given queue name. func QueueKey(qname string) string { - return QueuePrefix + qname + return QueuePrefix + strings.ToLower(qname) } // ProcessedKey returns a redis key string for procesed count diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 29cb80b..f279254 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -235,8 +235,18 @@ func (r *RDB) RedisInfo() (map[string]string, error) { return info, nil } -// ListEnqueued returns all enqueued tasks that are ready to be processed. -func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { +// ListEnqueued returns enqueued tasks that are ready to be processed. +// +// Queue names can be optionally passed to query only the specified queues. +// If none are passed, it will query all queues. +func (r *RDB) ListEnqueued(qnames ...string) ([]*EnqueuedTask, error) { + if len(qnames) == 0 { + return r.listAllEnqueued() + } + return r.listEnqueued(qnames...) +} + +func (r *RDB) listAllEnqueued() ([]*EnqueuedTask, error) { script := redis.NewScript(` local res = {} local queues = redis.call("SMEMBERS", KEYS[1]) @@ -256,6 +266,36 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { if err != nil { return nil, err } + return toEnqueuedTasks(data) +} + +func (r *RDB) listEnqueued(qnames ...string) ([]*EnqueuedTask, error) { + script := redis.NewScript(` + local res = {} + for _, qkey in ipairs(KEYS) do + local msgs = redis.call("LRANGE", qkey, 0, -1) + for _, msg in ipairs(msgs) do + table.insert(res, msg) + end + end + return res + `) + var keys []string + for _, q := range qnames { + keys = append(keys, base.QueueKey(q)) + } + res, err := script.Run(r.client, keys).Result() + if err != nil { + return nil, err + } + data, err := cast.ToStringSliceE(res) + if err != nil { + return nil, err + } + return toEnqueuedTasks(data) +} + +func toEnqueuedTasks(data []string) ([]*EnqueuedTask, error) { var tasks []*EnqueuedTask for _, s := range data { var msg base.TaskMessage diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0a6c104..53cf84d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -234,19 +234,22 @@ func TestListEnqueued(t *testing.T) { t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue} tests := []struct { enqueued map[string][]*base.TaskMessage + qnames []string want []*EnqueuedTask }{ { enqueued: map[string][]*base.TaskMessage{ base.DefaultQueueName: {m1, m2}, }, - want: []*EnqueuedTask{t1, t2}, + qnames: []string{}, + want: []*EnqueuedTask{t1, t2}, }, { enqueued: map[string][]*base.TaskMessage{ base.DefaultQueueName: {}, }, - want: []*EnqueuedTask{}, + qnames: []string{}, + want: []*EnqueuedTask{}, }, { enqueued: map[string][]*base.TaskMessage{ @@ -254,7 +257,26 @@ func TestListEnqueued(t *testing.T) { "critical": {m3}, "low": {m4}, }, - want: []*EnqueuedTask{t1, t2, t3, t4}, + qnames: []string{}, + want: []*EnqueuedTask{t1, t2, t3, t4}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1, m2}, + "critical": {m3}, + "low": {m4}, + }, + qnames: []string{"critical"}, + want: []*EnqueuedTask{t3}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1, m2}, + "critical": {m3}, + "low": {m4}, + }, + qnames: []string{"critical", "low"}, + want: []*EnqueuedTask{t3, t4}, }, } @@ -264,7 +286,7 @@ func TestListEnqueued(t *testing.T) { h.SeedEnqueuedQueue(t, r.client, msgs, qname) } - got, err := r.ListEnqueued() + got, err := r.ListEnqueued(tc.qnames...) if err != nil { t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want) continue diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index 2014003..1440824 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -23,18 +23,23 @@ var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead // lsCmd represents the ls command var lsCmd = &cobra.Command{ - Use: "ls [queue name]", - Short: "Lists queue contents", - Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format. + Use: "ls [task state]", + Short: "Lists tasks in the specified state", + Long: `Ls (asynqmon ls) will list all tasks in the specified state in a table format. -The command takes one argument which specifies the queue to inspect. The value -of the argument should be one of "enqueued", "inprogress", "scheduled", +The command takes one argument which specifies the state of tasks. +The argument value should be one of "enqueued", "inprogress", "scheduled", "retry", or "dead". -Example: asynqmon ls dead`, - ValidArgs: lsValidArgs, - Args: cobra.ExactValidArgs(1), - Run: ls, +Example: +asynqmon ls dead -> Lists all tasks in dead state + +Enqueued tasks can optionally be filtered by providing queue names after ":" +Example: +asynqmon ls enqueued:critical -> List tasks from critical queue only +`, + Args: cobra.ExactValidArgs(1), + Run: ls, } func init() { @@ -57,9 +62,10 @@ func ls(cmd *cobra.Command, args []string) { DB: db, }) r := rdb.NewRDB(c) - switch args[0] { + parts := strings.Split(args[0], ":") + switch parts[0] { case "enqueued": - listEnqueued(r) + listEnqueued(r, parts[1:]...) case "inprogress": listInProgress(r) case "scheduled": @@ -69,7 +75,7 @@ func ls(cmd *cobra.Command, args []string) { case "dead": listDead(r) default: - fmt.Printf("error: `asynqmon ls [queue name]` only accepts %v as the argument.\n", lsValidArgs) + fmt.Printf("error: `asynqmon ls [task state]` only accepts %v as the argument.\n", lsValidArgs) os.Exit(1) } } @@ -105,14 +111,24 @@ func parseQueryID(queryID string) (id xid.ID, score int64, qtype string, err err return id, score, qtype, nil } -func listEnqueued(r *rdb.RDB) { - tasks, err := r.ListEnqueued() +func listEnqueued(r *rdb.RDB, qnames ...string) { + tasks, err := r.ListEnqueued(qnames...) if err != nil { fmt.Println(err) os.Exit(1) } if len(tasks) == 0 { - fmt.Println("No enqueued tasks") + msg := "No enqueued tasks" + if len(qnames) > 0 { + msg += " in" + for i, q := range qnames { + msg += fmt.Sprintf(" %q queue", q) + if i != len(qnames)-1 { + msg += "," + } + } + } + fmt.Println(msg) return } cols := []string{"ID", "Type", "Payload", "Queue"}