diff --git a/asynq.go b/asynq.go index aa586c4..bed372e 100644 --- a/asynq.go +++ b/asynq.go @@ -4,7 +4,7 @@ import "github.com/go-redis/redis/v7" /* TODOs: -- [P0] command to retry tasks from "retry", "dead" queue +- [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue - [P0] Go docs + CONTRIBUTION.md + Github issue template - [P1] Add Support for multiple queues and priority - [P1] User defined max-retry count @@ -24,6 +24,7 @@ type Task struct { } // RedisConfig specifies redis configurations. +// TODO(hibiken): Support more configuration. type RedisConfig struct { Addr string Password string @@ -32,10 +33,10 @@ type RedisConfig struct { DB int } -func newRedisClient(config *RedisConfig) *redis.Client { +func newRedisClient(cfg *RedisConfig) *redis.Client { return redis.NewClient(&redis.Options{ - Addr: config.Addr, - Password: config.Password, - DB: config.DB, + Addr: cfg.Addr, + Password: cfg.Password, + DB: cfg.DB, }) } diff --git a/background.go b/background.go index a8e9398..42731fa 100644 --- a/background.go +++ b/background.go @@ -33,8 +33,8 @@ type Background struct { // NewBackground returns a new Background with the specified number of workers // given a redis configuration . -func NewBackground(numWorkers int, config *RedisConfig) *Background { - r := rdb.NewRDB(newRedisClient(config)) +func NewBackground(numWorkers int, cfg *RedisConfig) *Background { + r := rdb.NewRDB(newRedisClient(cfg)) poller := newPoller(r, 5*time.Second) processor := newProcessor(r, numWorkers, nil) return &Background{ diff --git a/client.go b/client.go index 0ad6224..c3ae3ea 100644 --- a/client.go +++ b/client.go @@ -9,7 +9,7 @@ import ( // A Client is responsible for scheduling tasks. // -// A Client is used to register task that should be processed +// A Client is used to register tasks that should be processed // immediately or some time in the future. // // Clients are safe for concurrent use by multiple goroutines. @@ -18,14 +18,14 @@ type Client struct { } // NewClient and returns a new Client given a redis configuration. -func NewClient(config *RedisConfig) *Client { - r := rdb.NewRDB(newRedisClient(config)) +func NewClient(cfg *RedisConfig) *Client { + r := rdb.NewRDB(newRedisClient(cfg)) return &Client{r} } // Process registers a task to be processed at the specified time. // -// Process returns nil if the task was registered successfully, +// Process returns nil if the task is registered successfully, // otherwise returns non-nil error. func (c *Client) Process(task *Task, processAt time.Time) error { msg := &rdb.TaskMessage{ diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 16d9bc9..38984be 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -42,6 +42,7 @@ type ScheduledTask struct { Type string Payload map[string]interface{} ProcessAt time.Time + Score int64 } // RetryTask is a task that's in retry queue because worker failed to process the task. @@ -55,6 +56,7 @@ type RetryTask struct { ErrorMsg string Retried int Retry int + Score int64 } // DeadTask is a task in that has exhausted all retries. @@ -65,6 +67,7 @@ type DeadTask struct { Payload map[string]interface{} LastFailedAt time.Time ErrorMsg string + Score int64 } // CurrentStats returns a current state of the queues. @@ -158,6 +161,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { Type: msg.Type, Payload: msg.Payload, ProcessAt: processAt, + Score: int64(z.Score), }) } return tasks, nil @@ -190,6 +194,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { Retry: msg.Retry, Retried: msg.Retried, ProcessAt: processAt, + Score: int64(z.Score), }) } return tasks, nil @@ -219,16 +224,17 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { Payload: msg.Payload, ErrorMsg: msg.ErrorMsg, LastFailedAt: lastFailedAt, + Score: int64(z.Score), }) } return tasks, nil } -// Rescue finds a task that matches the given id and score from dead queue +// EnqueueDeadTask finds a task that matches the given id and score from dead queue // and enqueues it for processing. If a task that matches the id and score // does not exist, it returns ErrTaskNotFound. -func (r *RDB) Rescue(id string, score float64) error { - n, err := r.removeAndEnqueue(deadQ, id, score) +func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error { + n, err := r.removeAndEnqueue(deadQ, id.String(), float64(score)) if err != nil { return err } @@ -238,11 +244,11 @@ func (r *RDB) Rescue(id string, score float64) error { return nil } -// RetryNow finds a task that matches the given id and score from retry queue +// EnqueueRetryTask finds a task that matches the given id and score from retry queue // and enqueues it for processing. If a task that matches the id and score // does not exist, it returns ErrTaskNotFound. -func (r *RDB) RetryNow(id string, score float64) error { - n, err := r.removeAndEnqueue(retryQ, id, score) +func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error { + n, err := r.removeAndEnqueue(retryQ, id.String(), float64(score)) if err != nil { return err } @@ -252,11 +258,11 @@ func (r *RDB) RetryNow(id string, score float64) error { return nil } -// ProcessNow finds a task that matches the given id and score from scheduled queue +// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue // and enqueues it for processing. If a task that matches the id and score does not // exist, it returns ErrTaskNotFound. -func (r *RDB) ProcessNow(id string, score float64) error { - n, err := r.removeAndEnqueue(scheduledQ, id, score) +func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { + n, err := r.removeAndEnqueue(scheduledQ, id.String(), float64(score)) if err != nil { return err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index a7a7e8a..33b6a39 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -249,8 +249,8 @@ func TestListScheduled(t *testing.T) { m2 := randomTask("reindex", "default", nil) p1 := time.Now().Add(30 * time.Minute) p2 := time.Now().Add(24 * time.Hour) - t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1} - t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2} + t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()} + t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()} type scheduledEntry struct { msg *TaskMessage @@ -330,9 +330,11 @@ func TestListRetry(t *testing.T) { p1 := time.Now().Add(5 * time.Minute) p2 := time.Now().Add(24 * time.Hour) t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, - ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, Retry: m1.Retry} + ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, + Retry: m1.Retry, Score: p1.Unix()} t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, - ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, Retry: m2.Retry} + ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, + Retry: m2.Retry, Score: p2.Unix()} type retryEntry struct { msg *TaskMessage @@ -407,8 +409,10 @@ func TestListDead(t *testing.T) { } f1 := time.Now().Add(-5 * time.Minute) f2 := time.Now().Add(-24 * time.Hour) - t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, LastFailedAt: f1, ErrorMsg: m1.ErrorMsg} - t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, LastFailedAt: f2, ErrorMsg: m2.ErrorMsg} + t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, + LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix()} + t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, + LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix()} type deadEntry struct { msg *TaskMessage @@ -467,22 +471,22 @@ func TestListDead(t *testing.T) { var timeCmpOpt = EquateApproxTime(time.Second) -func TestRescue(t *testing.T) { +func TestEnqueueDeadTask(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("gen_thumbnail", "default", nil) - s1 := float64(time.Now().Add(-5 * time.Minute).Unix()) - s2 := float64(time.Now().Add(-time.Hour).Unix()) + s1 := time.Now().Add(-5 * time.Minute).Unix() + s2 := time.Now().Add(-time.Hour).Unix() type deadEntry struct { msg *TaskMessage - score float64 + score int64 } tests := []struct { dead []deadEntry - score float64 - id string - want error // expected return value from calling Rescue + score int64 + id uuid.UUID + want error // expected return value from calling EnqueueDeadTask wantDead []*TaskMessage wantEnqueued []*TaskMessage }{ @@ -492,7 +496,7 @@ func TestRescue(t *testing.T) { {t2, s2}, }, score: s2, - id: t2.ID.String(), + id: t2.ID, want: nil, wantDead: []*TaskMessage{t1}, wantEnqueued: []*TaskMessage{t2}, @@ -502,8 +506,8 @@ func TestRescue(t *testing.T) { {t1, s1}, {t2, s2}, }, - score: 123.0, - id: t2.ID.String(), + score: 123, + id: t2.ID, want: ErrTaskNotFound, wantDead: []*TaskMessage{t1, t2}, wantEnqueued: []*TaskMessage{}, @@ -517,15 +521,15 @@ func TestRescue(t *testing.T) { } // initialize dead queue for _, d := range tc.dead { - err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err() + err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() if err != nil { t.Fatal(err) } } - got := r.Rescue(tc.id, tc.score) + got := r.EnqueueDeadTask(tc.id, tc.score) if got != tc.want { - t.Errorf("r.Rescue(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.EnqueueDeadTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } @@ -543,22 +547,22 @@ func TestRescue(t *testing.T) { } } -func TestRetryNow(t *testing.T) { +func TestEnqueueRetryTask(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("gen_thumbnail", "default", nil) - s1 := float64(time.Now().Add(-5 * time.Minute).Unix()) - s2 := float64(time.Now().Add(-time.Hour).Unix()) + s1 := time.Now().Add(-5 * time.Minute).Unix() + s2 := time.Now().Add(-time.Hour).Unix() type retryEntry struct { msg *TaskMessage - score float64 + score int64 } tests := []struct { dead []retryEntry - score float64 - id string - want error // expected return value from calling RetryNow + score int64 + id uuid.UUID + want error // expected return value from calling EnqueueRetryTask wantRetry []*TaskMessage wantEnqueued []*TaskMessage }{ @@ -568,7 +572,7 @@ func TestRetryNow(t *testing.T) { {t2, s2}, }, score: s2, - id: t2.ID.String(), + id: t2.ID, want: nil, wantRetry: []*TaskMessage{t1}, wantEnqueued: []*TaskMessage{t2}, @@ -578,8 +582,8 @@ func TestRetryNow(t *testing.T) { {t1, s1}, {t2, s2}, }, - score: 123.0, - id: t2.ID.String(), + score: 123, + id: t2.ID, want: ErrTaskNotFound, wantRetry: []*TaskMessage{t1, t2}, wantEnqueued: []*TaskMessage{}, @@ -593,15 +597,15 @@ func TestRetryNow(t *testing.T) { } // initialize retry queue for _, d := range tc.dead { - err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err() + err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() if err != nil { t.Fatal(err) } } - got := r.RetryNow(tc.id, tc.score) + got := r.EnqueueRetryTask(tc.id, tc.score) if got != tc.want { - t.Errorf("r.RetryNow(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } @@ -619,22 +623,22 @@ func TestRetryNow(t *testing.T) { } } -func TestProcessNow(t *testing.T) { +func TestEnqueueScheduledTask(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("gen_thumbnail", "default", nil) - s1 := float64(time.Now().Add(-5 * time.Minute).Unix()) - s2 := float64(time.Now().Add(-time.Hour).Unix()) + s1 := time.Now().Add(-5 * time.Minute).Unix() + s2 := time.Now().Add(-time.Hour).Unix() type scheduledEntry struct { msg *TaskMessage - score float64 + score int64 } tests := []struct { dead []scheduledEntry - score float64 - id string - want error // expected return value from calling ProcessNow + score int64 + id uuid.UUID + want error // expected return value from calling EnqueueScheduledTask wantScheduled []*TaskMessage wantEnqueued []*TaskMessage }{ @@ -644,7 +648,7 @@ func TestProcessNow(t *testing.T) { {t2, s2}, }, score: s2, - id: t2.ID.String(), + id: t2.ID, want: nil, wantScheduled: []*TaskMessage{t1}, wantEnqueued: []*TaskMessage{t2}, @@ -654,8 +658,8 @@ func TestProcessNow(t *testing.T) { {t1, s1}, {t2, s2}, }, - score: 123.0, - id: t2.ID.String(), + score: 123, + id: t2.ID, want: ErrTaskNotFound, wantScheduled: []*TaskMessage{t1, t2}, wantEnqueued: []*TaskMessage{}, @@ -669,15 +673,15 @@ func TestProcessNow(t *testing.T) { } // initialize scheduled queue for _, d := range tc.dead { - err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err() + err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err() if err != nil { t.Fatal(err) } } - got := r.ProcessNow(tc.id, tc.score) + got := r.EnqueueScheduledTask(tc.id, tc.score) if got != tc.want { - t.Errorf("r.RetryNow(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want) + t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } diff --git a/tools/asynqmon/cmd/enq.go b/tools/asynqmon/cmd/enq.go new file mode 100644 index 0000000..9e81fbe --- /dev/null +++ b/tools/asynqmon/cmd/enq.go @@ -0,0 +1,70 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +// enqCmd represents the enq command +var enqCmd = &cobra.Command{ + Use: "enq", + Short: "Enqueues a task given an identifier", + Long: `The enq command enqueues a task given an identifier. + +The command takes one argument which specifies the task to enqueue. +The task should be in either scheduled, retry or dead queue. +Identifier for a task should be obtained by running "asynqmon ls" command. + +The task enqueued by this command will be processed as soon as the task +gets dequeued by a processor. + +Example: asynqmon enq d:1575732274:b0415aa2-fd33-4b63-87c4-2f1a954ea4bf`, + Args: cobra.ExactArgs(1), + Run: enq, +} + +func init() { + rootCmd.AddCommand(enqCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // enqCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // enqCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func enq(cmd *cobra.Command, args []string) { + id, score, qtype, err := parseQueryID(args[0]) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + r := rdb.NewRDB(redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + })) + switch qtype { + case "s": + err = r.EnqueueScheduledTask(id, score) + case "r": + err = r.EnqueueRetryTask(id, score) + case "d": + err = r.EnqueueDeadTask(id, score) + default: + fmt.Println("invalid argument") + os.Exit(1) + } + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Printf("Successfully enqueued %v\n", args[0]) +} diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index f9cfb55..9a456f6 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -3,13 +3,14 @@ package cmd import ( "fmt" "io" - "log" "os" + "strconv" "strings" "text/tabwriter" "time" "github.com/go-redis/redis/v7" + "github.com/google/uuid" "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" ) @@ -19,7 +20,7 @@ var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} // lsCmd represents the ls command var lsCmd = &cobra.Command{ Use: "ls", - Short: "lists queue contents", + Short: "Lists queue contents", Long: `The ls command lists all tasks from the specified queue in a table format. The command takes one argument which specifies the queue to inspect. The value @@ -69,10 +70,42 @@ func ls(cmd *cobra.Command, args []string) { } } +// queryID returns an identifier used for "enq" command. +// score is the zset score and queryType should be one +// of "s", "r" or "d" (scheduled, retry, dead respectively). +func queryID(id uuid.UUID, score int64, qtype string) string { + const format = "%v:%v:%v" + return fmt.Sprintf(format, qtype, score, id) +} + +// parseQueryID is a reverse operation of queryID function. +// It takes a queryID and return each part of id with proper +// type if valid, otherwise it reports an error. +func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) { + parts := strings.Split(queryID, ":") + if len(parts) != 3 { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + id, err = uuid.Parse(parts[2]) + if err != nil { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + score, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + qtype = parts[0] + if len(qtype) != 1 || !strings.Contains("srd", qtype) { + return uuid.Nil, 0, "", fmt.Errorf("invalid id") + } + return id, score, qtype, nil +} + func listEnqueued(r *rdb.RDB) { tasks, err := r.ListEnqueued() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } if len(tasks) == 0 { fmt.Println("No enqueued tasks") @@ -90,7 +123,8 @@ func listEnqueued(r *rdb.RDB) { func listInProgress(r *rdb.RDB) { tasks, err := r.ListInProgress() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } if len(tasks) == 0 { fmt.Println("No in-progress tasks") @@ -108,7 +142,8 @@ func listInProgress(r *rdb.RDB) { func listScheduled(r *rdb.RDB) { tasks, err := r.ListScheduled() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } if len(tasks) == 0 { fmt.Println("No scheduled tasks") @@ -118,7 +153,7 @@ func listScheduled(r *rdb.RDB) { printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, processIn) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn) } } printTable(cols, printRows) @@ -127,7 +162,8 @@ func listScheduled(r *rdb.RDB) { func listRetry(r *rdb.RDB) { tasks, err := r.ListRetry() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } if len(tasks) == 0 { fmt.Println("No retry tasks") @@ -137,7 +173,7 @@ func listRetry(r *rdb.RDB) { printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry) } } printTable(cols, printRows) @@ -146,7 +182,8 @@ func listRetry(r *rdb.RDB) { func listDead(r *rdb.RDB) { tasks, err := r.ListDead() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } if len(tasks) == 0 { fmt.Println("No dead tasks") @@ -155,7 +192,7 @@ func listDead(r *rdb.RDB) { cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) } } printTable(cols, printRows) diff --git a/tools/asynqmon/cmd/root.go b/tools/asynqmon/cmd/root.go index 9bc08ac..596ddd0 100644 --- a/tools/asynqmon/cmd/root.go +++ b/tools/asynqmon/cmd/root.go @@ -21,9 +21,10 @@ var rootCmd = &cobra.Command{ Short: "A monitoring tool for asynq queues", Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package. -Asynqmon has a few subcommands to show the current state of the queues, while others were -used to make manual changes to the queues. Monitoring commands can be used in conjunction -with the "watch" command to continuously run the command at a certain interval. +Asynqmon has a few subcommands to query and mutate the current state of the queues. + +Monitoring commands such as "stats" and "ls" can be used in conjunction with the +"watch" command to continuously run the command at a certain interval. Example: watch -n 5 asynqmon stats`, // Uncomment the following line if your bare application diff --git a/tools/asynqmon/cmd/stats.go b/tools/asynqmon/cmd/stats.go index 86fae4c..59d49c5 100644 --- a/tools/asynqmon/cmd/stats.go +++ b/tools/asynqmon/cmd/stats.go @@ -2,7 +2,6 @@ package cmd import ( "fmt" - "log" "os" "strings" "text/tabwriter" @@ -15,7 +14,7 @@ import ( // statsCmd represents the stats command var statsCmd = &cobra.Command{ Use: "stats", - Short: "shows current state of the queues", + Short: "Shows current state of the queues", Long: `The stats command shows the number of tasks in each queue at that instant. To monitor the queues continuously, it's recommended that you run this @@ -49,7 +48,8 @@ func stats(cmd *cobra.Command, args []string) { stats, err := r.CurrentStats() if err != nil { - log.Fatal(err) + fmt.Println(err) + os.Exit(1) } printStats(stats) fmt.Println()