diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 16d9bc9..294effe 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -42,6 +42,7 @@ type ScheduledTask struct { Type string Payload map[string]interface{} ProcessAt time.Time + Score int64 } // RetryTask is a task that's in retry queue because worker failed to process the task. @@ -55,6 +56,7 @@ type RetryTask struct { ErrorMsg string Retried int Retry int + Score int64 } // DeadTask is a task in that has exhausted all retries. @@ -65,6 +67,7 @@ type DeadTask struct { Payload map[string]interface{} LastFailedAt time.Time ErrorMsg string + Score int64 } // CurrentStats returns a current state of the queues. @@ -158,6 +161,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { Type: msg.Type, Payload: msg.Payload, ProcessAt: processAt, + Score: int64(z.Score), }) } return tasks, nil @@ -190,6 +194,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { Retry: msg.Retry, Retried: msg.Retried, ProcessAt: processAt, + Score: int64(z.Score), }) } return tasks, nil @@ -219,6 +224,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { Payload: msg.Payload, ErrorMsg: msg.ErrorMsg, LastFailedAt: lastFailedAt, + Score: int64(z.Score), }) } return tasks, nil diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index a7a7e8a..9cf367a 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -249,8 +249,8 @@ func TestListScheduled(t *testing.T) { m2 := randomTask("reindex", "default", nil) p1 := time.Now().Add(30 * time.Minute) p2 := time.Now().Add(24 * time.Hour) - t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1} - t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2} + t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()} + t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()} type scheduledEntry struct { msg *TaskMessage @@ -330,9 +330,11 @@ func TestListRetry(t *testing.T) { p1 := time.Now().Add(5 * time.Minute) p2 := time.Now().Add(24 * time.Hour) t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, - ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, Retry: m1.Retry} + ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, + Retry: m1.Retry, Score: p1.Unix()} t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, - ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, Retry: m2.Retry} + ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, + Retry: m2.Retry, Score: p2.Unix()} type retryEntry struct { msg *TaskMessage @@ -407,8 +409,10 @@ func TestListDead(t *testing.T) { } f1 := time.Now().Add(-5 * time.Minute) f2 := time.Now().Add(-24 * time.Hour) - t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, LastFailedAt: f1, ErrorMsg: m1.ErrorMsg} - t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, LastFailedAt: f2, ErrorMsg: m2.ErrorMsg} + t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, + LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix()} + t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, + LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix()} type deadEntry struct { msg *TaskMessage diff --git a/tools/asynqmon/cmd/enq.go b/tools/asynqmon/cmd/enq.go new file mode 100644 index 0000000..bbf6d53 --- /dev/null +++ b/tools/asynqmon/cmd/enq.go @@ -0,0 +1,65 @@ +package cmd + +import ( + "fmt" + "log" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +// enqCmd represents the enq command +var enqCmd = &cobra.Command{ + Use: "enq", + Short: "Enqueues a task given an identifier", + Long: `The enq command enqueues a task given an identifier. + +The target task should be in either scheduled, retry or dead queue. +Identifier for a task should be obtained by running "asynqmon ls" command. +The task enqueued by this command will be processed as soon as the task +gets dequeued by a processor. + +Example: asynqmon enq d:1575732274:b0415aa2-fd33-4b63-87c4-2f1a954ea4bf`, + Args: cobra.ExactArgs(1), + Run: enq, +} + +func init() { + rootCmd.AddCommand(enqCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // enqCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // enqCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func enq(cmd *cobra.Command, args []string) { + id, score, qtype, err := parseQueryID(args[0]) + if err != nil { + log.Fatalln(err) + } + r := rdb.NewRDB(redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + })) + switch qtype { + case "s": + err = r.ProcessNow(id.String(), float64(score)) + case "r": + err = r.RetryNow(id.String(), float64(score)) + case "d": + err = r.Rescue(id.String(), float64(score)) + default: + log.Fatalln("invalid argument") + } + if err != nil { + log.Fatalln(err) + } + fmt.Printf("Successfully enqueued %v\n", args[0]) +} diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index f9cfb55..7f1ecc2 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -5,11 +5,13 @@ import ( "io" "log" "os" + "strconv" "strings" "text/tabwriter" "time" "github.com/go-redis/redis/v7" + "github.com/google/uuid" "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" ) @@ -19,7 +21,7 @@ var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} // lsCmd represents the ls command var lsCmd = &cobra.Command{ Use: "ls", - Short: "lists queue contents", + Short: "Lists queue contents", Long: `The ls command lists all tasks from the specified queue in a table format. The command takes one argument which specifies the queue to inspect. The value @@ -69,10 +71,41 @@ func ls(cmd *cobra.Command, args []string) { } } +// queryID returns an identifier used for "enq" command. +// score is the zset score and queryType should be one +// of "s", "r" or "d" (scheduled, retry, dead respectively). +func queryID(id uuid.UUID, score int64, qtype string) string { + const format = "%v:%v:%v" + return fmt.Sprintf(format, qtype, score, id) +} + +// parseQueryID is a reverse operation of queryID function. +// It takes a queryID and return each part of id with proper +// type if valid, otherwise it reports an error. +func parseQueryID(queryID string) (id uuid.UUID, score float64, qtype string, err error) { + parts := strings.Split(queryID, ":") + if len(parts) != 3 { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + id, err = uuid.Parse(parts[2]) + if err != nil { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + score, err = strconv.ParseFloat(parts[1], 64) + if err != nil { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + qtype = parts[0] + if len(qtype) != 1 || !strings.Contains("srd", qtype) { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + return id, score, qtype, nil +} + func listEnqueued(r *rdb.RDB) { tasks, err := r.ListEnqueued() if err != nil { - log.Fatal(err) + log.Fatalln(err) } if len(tasks) == 0 { fmt.Println("No enqueued tasks") @@ -90,7 +123,7 @@ func listEnqueued(r *rdb.RDB) { func listInProgress(r *rdb.RDB) { tasks, err := r.ListInProgress() if err != nil { - log.Fatal(err) + log.Fatalln(err) } if len(tasks) == 0 { fmt.Println("No in-progress tasks") @@ -108,7 +141,7 @@ func listInProgress(r *rdb.RDB) { func listScheduled(r *rdb.RDB) { tasks, err := r.ListScheduled() if err != nil { - log.Fatal(err) + log.Fatalln(err) } if len(tasks) == 0 { fmt.Println("No scheduled tasks") @@ -118,7 +151,7 @@ func listScheduled(r *rdb.RDB) { printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, processIn) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn) } } printTable(cols, printRows) @@ -127,7 +160,7 @@ func listScheduled(r *rdb.RDB) { func listRetry(r *rdb.RDB) { tasks, err := r.ListRetry() if err != nil { - log.Fatal(err) + log.Fatalln(err) } if len(tasks) == 0 { fmt.Println("No retry tasks") @@ -137,7 +170,7 @@ func listRetry(r *rdb.RDB) { printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry) } } printTable(cols, printRows) @@ -146,7 +179,7 @@ func listRetry(r *rdb.RDB) { func listDead(r *rdb.RDB) { tasks, err := r.ListDead() if err != nil { - log.Fatal(err) + log.Fatalln(err) } if len(tasks) == 0 { fmt.Println("No dead tasks") @@ -155,7 +188,7 @@ func listDead(r *rdb.RDB) { cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) } } printTable(cols, printRows) diff --git a/tools/asynqmon/cmd/root.go b/tools/asynqmon/cmd/root.go index 9bc08ac..596ddd0 100644 --- a/tools/asynqmon/cmd/root.go +++ b/tools/asynqmon/cmd/root.go @@ -21,9 +21,10 @@ var rootCmd = &cobra.Command{ Short: "A monitoring tool for asynq queues", Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package. -Asynqmon has a few subcommands to show the current state of the queues, while others were -used to make manual changes to the queues. Monitoring commands can be used in conjunction -with the "watch" command to continuously run the command at a certain interval. +Asynqmon has a few subcommands to query and mutate the current state of the queues. + +Monitoring commands such as "stats" and "ls" can be used in conjunction with the +"watch" command to continuously run the command at a certain interval. Example: watch -n 5 asynqmon stats`, // Uncomment the following line if your bare application diff --git a/tools/asynqmon/cmd/stats.go b/tools/asynqmon/cmd/stats.go index 86fae4c..35bbf74 100644 --- a/tools/asynqmon/cmd/stats.go +++ b/tools/asynqmon/cmd/stats.go @@ -15,7 +15,7 @@ import ( // statsCmd represents the stats command var statsCmd = &cobra.Command{ Use: "stats", - Short: "shows current state of the queues", + Short: "Shows current state of the queues", Long: `The stats command shows the number of tasks in each queue at that instant. To monitor the queues continuously, it's recommended that you run this