2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Add enq command to asynqmon

This commit is contained in:
Ken Hibino 2019-12-08 16:36:08 -08:00
parent 39c4904dae
commit 4179c72c05
6 changed files with 128 additions and 19 deletions

View File

@ -42,6 +42,7 @@ type ScheduledTask struct {
Type string
Payload map[string]interface{}
ProcessAt time.Time
Score int64
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
@ -55,6 +56,7 @@ type RetryTask struct {
ErrorMsg string
Retried int
Retry int
Score int64
}
// DeadTask is a task in that has exhausted all retries.
@ -65,6 +67,7 @@ type DeadTask struct {
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
Score int64
}
// CurrentStats returns a current state of the queues.
@ -158,6 +161,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
Type: msg.Type,
Payload: msg.Payload,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
@ -190,6 +194,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
Retry: msg.Retry,
Retried: msg.Retried,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
@ -219,6 +224,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
}
return tasks, nil

View File

@ -249,8 +249,8 @@ func TestListScheduled(t *testing.T) {
m2 := randomTask("reindex", "default", nil)
p1 := time.Now().Add(30 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1}
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2}
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()}
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()}
type scheduledEntry struct {
msg *TaskMessage
@ -330,9 +330,11 @@ func TestListRetry(t *testing.T) {
p1 := time.Now().Add(5 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload,
ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, Retry: m1.Retry}
ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried,
Retry: m1.Retry, Score: p1.Unix()}
t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload,
ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, Retry: m2.Retry}
ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried,
Retry: m2.Retry, Score: p2.Unix()}
type retryEntry struct {
msg *TaskMessage
@ -407,8 +409,10 @@ func TestListDead(t *testing.T) {
}
f1 := time.Now().Add(-5 * time.Minute)
f2 := time.Now().Add(-24 * time.Hour)
t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, LastFailedAt: f1, ErrorMsg: m1.ErrorMsg}
t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, LastFailedAt: f2, ErrorMsg: m2.ErrorMsg}
t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload,
LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix()}
t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload,
LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix()}
type deadEntry struct {
msg *TaskMessage

65
tools/asynqmon/cmd/enq.go Normal file
View File

@ -0,0 +1,65 @@
package cmd
import (
"fmt"
"log"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
// enqCmd represents the enq command
var enqCmd = &cobra.Command{
Use: "enq",
Short: "Enqueues a task given an identifier",
Long: `The enq command enqueues a task given an identifier.
The target task should be in either scheduled, retry or dead queue.
Identifier for a task should be obtained by running "asynqmon ls" command.
The task enqueued by this command will be processed as soon as the task
gets dequeued by a processor.
Example: asynqmon enq d:1575732274:b0415aa2-fd33-4b63-87c4-2f1a954ea4bf`,
Args: cobra.ExactArgs(1),
Run: enq,
}
func init() {
rootCmd.AddCommand(enqCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// enqCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// enqCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
func enq(cmd *cobra.Command, args []string) {
id, score, qtype, err := parseQueryID(args[0])
if err != nil {
log.Fatalln(err)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
}))
switch qtype {
case "s":
err = r.ProcessNow(id.String(), float64(score))
case "r":
err = r.RetryNow(id.String(), float64(score))
case "d":
err = r.Rescue(id.String(), float64(score))
default:
log.Fatalln("invalid argument")
}
if err != nil {
log.Fatalln(err)
}
fmt.Printf("Successfully enqueued %v\n", args[0])
}

View File

@ -5,11 +5,13 @@ import (
"io"
"log"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
@ -19,7 +21,7 @@ var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"}
// lsCmd represents the ls command
var lsCmd = &cobra.Command{
Use: "ls",
Short: "lists queue contents",
Short: "Lists queue contents",
Long: `The ls command lists all tasks from the specified queue in a table format.
The command takes one argument which specifies the queue to inspect. The value
@ -69,10 +71,41 @@ func ls(cmd *cobra.Command, args []string) {
}
}
// queryID returns an identifier used for "enq" command.
// score is the zset score and queryType should be one
// of "s", "r" or "d" (scheduled, retry, dead respectively).
func queryID(id uuid.UUID, score int64, qtype string) string {
const format = "%v:%v:%v"
return fmt.Sprintf(format, qtype, score, id)
}
// parseQueryID is a reverse operation of queryID function.
// It takes a queryID and return each part of id with proper
// type if valid, otherwise it reports an error.
func parseQueryID(queryID string) (id uuid.UUID, score float64, qtype string, err error) {
parts := strings.Split(queryID, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[2])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseFloat(parts[1], 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB) {
tasks, err := r.ListEnqueued()
if err != nil {
log.Fatal(err)
log.Fatalln(err)
}
if len(tasks) == 0 {
fmt.Println("No enqueued tasks")
@ -90,7 +123,7 @@ func listEnqueued(r *rdb.RDB) {
func listInProgress(r *rdb.RDB) {
tasks, err := r.ListInProgress()
if err != nil {
log.Fatal(err)
log.Fatalln(err)
}
if len(tasks) == 0 {
fmt.Println("No in-progress tasks")
@ -108,7 +141,7 @@ func listInProgress(r *rdb.RDB) {
func listScheduled(r *rdb.RDB) {
tasks, err := r.ListScheduled()
if err != nil {
log.Fatal(err)
log.Fatalln(err)
}
if len(tasks) == 0 {
fmt.Println("No scheduled tasks")
@ -118,7 +151,7 @@ func listScheduled(r *rdb.RDB) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, processIn)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn)
}
}
printTable(cols, printRows)
@ -127,7 +160,7 @@ func listScheduled(r *rdb.RDB) {
func listRetry(r *rdb.RDB) {
tasks, err := r.ListRetry()
if err != nil {
log.Fatal(err)
log.Fatalln(err)
}
if len(tasks) == 0 {
fmt.Println("No retry tasks")
@ -137,7 +170,7 @@ func listRetry(r *rdb.RDB) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
}
}
printTable(cols, printRows)
@ -146,7 +179,7 @@ func listRetry(r *rdb.RDB) {
func listDead(r *rdb.RDB) {
tasks, err := r.ListDead()
if err != nil {
log.Fatal(err)
log.Fatalln(err)
}
if len(tasks) == 0 {
fmt.Println("No dead tasks")
@ -155,7 +188,7 @@ func listDead(r *rdb.RDB) {
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
}
}
printTable(cols, printRows)

View File

@ -21,9 +21,10 @@ var rootCmd = &cobra.Command{
Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package.
Asynqmon has a few subcommands to show the current state of the queues, while others were
used to make manual changes to the queues. Monitoring commands can be used in conjunction
with the "watch" command to continuously run the command at a certain interval.
Asynqmon has a few subcommands to query and mutate the current state of the queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval.
Example: watch -n 5 asynqmon stats`,
// Uncomment the following line if your bare application

View File

@ -15,7 +15,7 @@ import (
// statsCmd represents the stats command
var statsCmd = &cobra.Command{
Use: "stats",
Short: "shows current state of the queues",
Short: "Shows current state of the queues",
Long: `The stats command shows the number of tasks in each queue at that instant.
To monitor the queues continuously, it's recommended that you run this