From f38f94b94704fe647961865dfd9467903f0f809a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 21 Aug 2020 06:00:49 -0700 Subject: [PATCH] Restructure CLI commands with subcommands --- inspector.go | 12 +- inspector_test.go | 8 +- internal/asynqtest/asynqtest.go | 5 + internal/base/base.go | 2 +- internal/rdb/inspect.go | 48 +- internal/rdb/inspect_test.go | 5 +- tools/asynq/cmd/cancel.go | 53 --- tools/asynq/cmd/del.go | 57 --- tools/asynq/cmd/delall.go | 72 --- tools/asynq/cmd/enq.go | 60 --- tools/asynq/cmd/enqall.go | 75 ---- tools/asynq/cmd/history.go | 69 --- tools/asynq/cmd/kill.go | 58 --- tools/asynq/cmd/killall.go | 70 --- tools/asynq/cmd/ls.go | 190 -------- tools/asynq/cmd/migrate.go | 212 --------- tools/asynq/cmd/pause.go | 47 -- tools/asynq/cmd/queue.go | 244 ++++++++++ tools/asynq/cmd/rmq.go | 54 --- tools/asynq/cmd/{servers.go => server.go} | 42 +- tools/asynq/cmd/stats.go | 63 ++- tools/asynq/cmd/task.go | 515 ++++++++++++++++++++++ tools/asynq/cmd/unpause.go | 46 -- tools/asynq/cmd/workers.go | 75 ---- 24 files changed, 897 insertions(+), 1185 deletions(-) delete mode 100644 tools/asynq/cmd/cancel.go delete mode 100644 tools/asynq/cmd/del.go delete mode 100644 tools/asynq/cmd/delall.go delete mode 100644 tools/asynq/cmd/enq.go delete mode 100644 tools/asynq/cmd/enqall.go delete mode 100644 tools/asynq/cmd/history.go delete mode 100644 tools/asynq/cmd/kill.go delete mode 100644 tools/asynq/cmd/killall.go delete mode 100644 tools/asynq/cmd/ls.go delete mode 100644 tools/asynq/cmd/migrate.go delete mode 100644 tools/asynq/cmd/pause.go create mode 100644 tools/asynq/cmd/queue.go delete mode 100644 tools/asynq/cmd/rmq.go rename tools/asynq/cmd/{servers.go => server.go} (85%) create mode 100644 tools/asynq/cmd/task.go delete mode 100644 tools/asynq/cmd/unpause.go delete mode 100644 tools/asynq/cmd/workers.go diff --git a/inspector.go b/inspector.go index 8630606..76aabc5 100644 --- a/inspector.go +++ b/inspector.go @@ -32,10 +32,13 @@ func (i *Inspector) Queues() ([]string, error) { return i.rdb.AllQueues() } -// Stats represents a state of queues at a certain time. -type Stats struct { +// QueueStats represents a state of queues at a certain time. +type QueueStats struct { // Name of the queue. Queue string + // Size is the total number of tasks in the queue. + // The value is the sum of Enqueued, InProgress, Scheduled, Retry, and Dead. + Size int // Number of enqueued tasks. Enqueued int // Number of in-progress tasks. @@ -59,13 +62,14 @@ type Stats struct { } // CurrentStats returns a current stats of the given queue. -func (i *Inspector) CurrentStats(qname string) (*Stats, error) { +func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { stats, err := i.rdb.CurrentStats(qname) if err != nil { return nil, err } - return &Stats{ + return &QueueStats{ Queue: stats.Queue, + Size: stats.Size, Enqueued: stats.Enqueued, InProgress: stats.InProgress, Scheduled: stats.Scheduled, diff --git a/inspector_test.go b/inspector_test.go index fce3251..401b4b2 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -77,7 +77,7 @@ func TestInspectorCurrentStats(t *testing.T) { processed map[string]int failed map[string]int qname string - want *Stats + want *QueueStats }{ { enqueued: map[string][]*base.TaskMessage{ @@ -119,8 +119,9 @@ func TestInspectorCurrentStats(t *testing.T) { "low": 5, }, qname: "default", - want: &Stats{ + want: &QueueStats{ Queue: "default", + Size: 4, Enqueued: 1, InProgress: 1, Scheduled: 2, @@ -181,12 +182,13 @@ func TestInspectorHistory(t *testing.T) { }{ {"default", 90}, {"custom", 7}, - {"default", 0}, + {"default", 1}, } for _, tc := range tests { asynqtest.FlushDB(t, r) + r.SAdd(base.AllQueues, tc.qname) // populate last n days data for i := 0; i < tc.n; i++ { ts := now.Add(-time.Duration(i) * 24 * time.Hour) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 8e1bb2f..99270f3 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -162,30 +162,35 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, // SeedInProgressQueue initializes the in-progress queue with the given messages. func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, qname string) { tb.Helper() + r.SAdd(base.AllQueues, qname) seedRedisList(tb, r, base.InProgressKey(qname), msgs) } // SeedScheduledQueue initializes the scheduled queue with the given messages. func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { tb.Helper() + r.SAdd(base.AllQueues, qname) seedRedisZSet(tb, r, base.ScheduledKey(qname), entries) } // SeedRetryQueue initializes the retry queue with the given messages. func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { tb.Helper() + r.SAdd(base.AllQueues, qname) seedRedisZSet(tb, r, base.RetryKey(qname), entries) } // SeedDeadQueue initializes the dead queue with the given messages. func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { tb.Helper() + r.SAdd(base.AllQueues, qname) seedRedisZSet(tb, r, base.DeadKey(qname), entries) } // SeedDeadlines initializes the deadlines set with the given entries. func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z, qname string) { tb.Helper() + r.SAdd(base.AllQueues, qname) seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries) } diff --git a/internal/base/base.go b/internal/base/base.go index b4393e7..271f49e 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -19,7 +19,7 @@ import ( ) // Version of asynq library and CLI. -const Version = "0.10.0" +const Version = "0.12.0" // DefaultQueueName is the queue name used if none are specified by user. const DefaultQueueName = "default" diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index ddc9c50..9bc1268 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -28,6 +28,8 @@ type Stats struct { // Paused indicates whether the queue is paused. // If true, tasks in the queue should not be processed. Paused bool + // Size is the total number of tasks in the queue. + Size int // Number of tasks in each state. Enqueued int InProgress int @@ -125,20 +127,26 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { Queue: qname, Timestamp: now, } + size := 0 for i := 0; i < len(data); i += 2 { key := cast.ToString(data[i]) val := cast.ToInt(data[i+1]) switch key { case base.QueueKey(qname): stats.Enqueued = val + size += val case base.InProgressKey(qname): stats.InProgress = val + size += val case base.ScheduledKey(qname): stats.Scheduled = val + size += val case base.RetryKey(qname): stats.Retry = val + size += val case base.DeadKey(qname): stats.Dead = val + size += val case base.ProcessedKey(qname, now): stats.Processed = val case base.FailedKey(qname, now): @@ -151,6 +159,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { } } } + stats.Size = size return stats, nil } @@ -168,7 +177,14 @@ return res`) // HistoricalStats returns a list of stats from the last n days for the given queue. func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) { if n < 1 { - return []*DailyStats{}, nil + return nil, fmt.Errorf("the number of days must be positive") + } + exists, err := r.client.SIsMember(base.AllQueues, qname).Result() + if err != nil { + return nil, err + } + if !exists { + return nil, &ErrQueueNotFound{qname} } const day = 24 * time.Hour now := time.Now().UTC() @@ -252,6 +268,9 @@ func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, e // ListInProgress returns all tasks that are currently being processed for the given queue. func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) { + if !r.client.SIsMember(base.AllQueues, qname).Val() { + return nil, fmt.Errorf("queue %q does not exist", qname) + } return r.listMessages(base.InProgressKey(qname), pgn) } @@ -281,17 +300,26 @@ func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, err // ListScheduled returns all tasks from the given queue that are scheduled // to be processed in the future. func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { + if !r.client.SIsMember(base.AllQueues, qname).Val() { + return nil, fmt.Errorf("queue %q does not exist", qname) + } return r.listZSetEntries(base.ScheduledKey(qname), pgn) } // ListRetry returns all tasks from the given queue that have failed before // and willl be retried in the future. func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { + if !r.client.SIsMember(base.AllQueues, qname).Val() { + return nil, fmt.Errorf("queue %q does not exist", qname) + } return r.listZSetEntries(base.RetryKey(qname), pgn) } // ListDead returns all tasks from the given queue that have exhausted its retry limit. func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) { + if !r.client.SIsMember(base.AllQueues, qname).Val() { + return nil, fmt.Errorf("queue %q does not exist", qname) + } return r.listZSetEntries(base.DeadKey(qname), pgn) } @@ -615,6 +643,15 @@ func (e *ErrQueueNotFound) Error() string { return fmt.Sprintf("queue %q does not exist", e.qname) } +// ErrQueueNotEmpty indicates specified queue is not empty. +type ErrQueueNotEmpty struct { + qname string +} + +func (e *ErrQueueNotEmpty) Error() string { + return fmt.Sprintf("queue %q is not empty", e.qname) +} + // Only check whether in-progress queue is empty before removing. // KEYS[1] -> asynq:{} // KEYS[2] -> asynq:{}:in_progress @@ -650,7 +687,7 @@ local retry = redis.call("SCARD", KEYS[4]) local dead = redis.call("SCARD", KEYS[5]) local total = enqueued + inprogress + scheduled + retry + dead if total > 0 then - return redis.error_reply("Queue is not empty") + return redis.error_reply("QUEUE NOT EMPTY") end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) @@ -689,8 +726,13 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { base.DeadlinesKey(qname), } if err := script.Run(r.client, keys).Err(); err != nil { - return err + if err.Error() == "QUEUE NOT EMPTY" { + return &ErrQueueNotEmpty{qname} + } else { + return err + } } + return r.client.SRem(base.AllQueues, qname).Err() } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 50985e6..cedb287 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -112,6 +112,7 @@ func TestCurrentStats(t *testing.T) { want: &Stats{ Queue: "default", Paused: false, + Size: 4, Enqueued: 1, InProgress: 1, Scheduled: 2, @@ -166,6 +167,7 @@ func TestCurrentStats(t *testing.T) { want: &Stats{ Queue: "critical", Paused: true, + Size: 1, Enqueued: 1, InProgress: 0, Scheduled: 0, @@ -232,12 +234,13 @@ func TestHistoricalStats(t *testing.T) { }{ {"default", 90}, {"custom", 7}, - {"default", 0}, + {"default", 1}, } for _, tc := range tests { h.FlushDB(t, r.client) + r.client.SAdd(base.AllQueues, tc.qname) // populate last n days data for i := 0; i < tc.n; i++ { ts := now.Add(-time.Duration(i) * 24 * time.Hour) diff --git a/tools/asynq/cmd/cancel.go b/tools/asynq/cmd/cancel.go deleted file mode 100644 index 56aa122..0000000 --- a/tools/asynq/cmd/cancel.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// cancelCmd represents the cancel command -var cancelCmd = &cobra.Command{ - Use: "cancel [task id]", - Short: "Sends a cancelation signal to the goroutine processing the specified task", - Long: `Cancel (asynq cancel) will send a cancelation signal to the goroutine processing -the specified task. - -The command takes one argument which specifies the task to cancel. -The task should be in in-progress state. -Identifier for a task should be obtained by running "asynq ls" command. - -Handler implementation needs to be context aware for cancelation signal to -actually cancel the processing. - -Example: asynq cancel bnogo8gt6toe23vhef0g`, - Args: cobra.ExactArgs(1), - Run: cancel, -} - -func init() { - rootCmd.AddCommand(cancelCmd) -} - -func cancel(cmd *cobra.Command, args []string) { - r := rdb.NewRDB(redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - })) - - err := r.PublishCancelation(args[0]) - if err != nil { - fmt.Printf("could not send cancelation signal: %v\n", err) - os.Exit(1) - } - fmt.Printf("Successfully sent cancelation siganl for task %s\n", args[0]) -} diff --git a/tools/asynq/cmd/del.go b/tools/asynq/cmd/del.go deleted file mode 100644 index adc336a..0000000 --- a/tools/asynq/cmd/del.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// delCmd represents the del command -var delCmd = &cobra.Command{ - Use: "del [task key]", - Short: "Deletes a task given an identifier", - Long: `Del (asynq del) will delete a task given an identifier. - -The command takes one argument which specifies the task to delete. -The task should be in either scheduled, retry or dead state. -Identifier for a task should be obtained by running "asynq ls" command. - -Example: asynq enq d:1575732274:bnogo8gt6toe23vhef0g`, - Args: cobra.ExactArgs(1), - Run: del, -} - -func init() { - rootCmd.AddCommand(delCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // delCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // delCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func del(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - err := i.DeleteTaskByKey(args[0]) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Successfully deleted %v\n", args[0]) -} diff --git a/tools/asynq/cmd/delall.go b/tools/asynq/cmd/delall.go deleted file mode 100644 index 33ae9b6..0000000 --- a/tools/asynq/cmd/delall.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var delallValidArgs = []string{"scheduled", "retry", "dead"} - -// delallCmd represents the delall command -var delallCmd = &cobra.Command{ - Use: "delall [state]", - Short: "Deletes all tasks in the specified state", - Long: `Delall (asynq delall) will delete all tasks in the specified state. - -The argument should be one of "scheduled", "retry", or "dead". - -Example: asynq delall dead -> Deletes all dead tasks`, - ValidArgs: delallValidArgs, - Args: cobra.ExactValidArgs(1), - Run: delall, -} - -func init() { - rootCmd.AddCommand(delallCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // delallCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // delallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func delall(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - var ( - n int - err error - ) - switch args[0] { - case "scheduled": - n, err = i.DeleteAllScheduledTasks() - case "retry": - n, err = i.DeleteAllRetryTasks() - case "dead": - n, err = i.DeleteAllDeadTasks() - default: - fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs) - os.Exit(1) - } - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Deleted all %d tasks in %q state\n", n, args[0]) -} diff --git a/tools/asynq/cmd/enq.go b/tools/asynq/cmd/enq.go deleted file mode 100644 index de64a9d..0000000 --- a/tools/asynq/cmd/enq.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// enqCmd represents the enq command -var enqCmd = &cobra.Command{ - Use: "enq [task key]", - Short: "Enqueues a task given an identifier", - Long: `Enq (asynq enq) will enqueue a task given an identifier. - -The command takes one argument which specifies the task to enqueue. -The task should be in either scheduled, retry or dead state. -Identifier for a task should be obtained by running "asynq ls" command. - -The task enqueued by this command will be processed as soon as the task -gets dequeued by a processor. - -Example: asynq enq d:1575732274:bnogo8gt6toe23vhef0g`, - Args: cobra.ExactArgs(1), - Run: enq, -} - -func init() { - rootCmd.AddCommand(enqCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // enqCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // enqCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func enq(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - err := i.EnqueueTaskByKey(args[0]) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Successfully enqueued %v\n", args[0]) -} diff --git a/tools/asynq/cmd/enqall.go b/tools/asynq/cmd/enqall.go deleted file mode 100644 index dd1f4d1..0000000 --- a/tools/asynq/cmd/enqall.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var enqallValidArgs = []string{"scheduled", "retry", "dead"} - -// enqallCmd represents the enqall command -var enqallCmd = &cobra.Command{ - Use: "enqall [state]", - Short: "Enqueues all tasks in the specified state", - Long: `Enqall (asynq enqall) will enqueue all tasks in the specified state. - -The argument should be one of "scheduled", "retry", or "dead". - -The tasks enqueued by this command will be processed as soon as it -gets dequeued by a processor. - -Example: asynq enqall dead -> Enqueues all dead tasks`, - ValidArgs: enqallValidArgs, - Args: cobra.ExactValidArgs(1), - Run: enqall, -} - -func init() { - rootCmd.AddCommand(enqallCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // enqallCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // enqallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func enqall(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - var ( - n int - err error - ) - switch args[0] { - case "scheduled": - n, err = i.EnqueueAllScheduledTasks() - case "retry": - n, err = i.EnqueueAllRetryTasks() - case "dead": - n, err = i.EnqueueAllDeadTasks() - default: - fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs) - os.Exit(1) - } - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Enqueued %d tasks in %q state\n", n, args[0]) -} diff --git a/tools/asynq/cmd/history.go b/tools/asynq/cmd/history.go deleted file mode 100644 index 16e1b6e..0000000 --- a/tools/asynq/cmd/history.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - "strings" - "text/tabwriter" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var days int - -// historyCmd represents the history command -var historyCmd = &cobra.Command{ - Use: "history", - Short: "Shows historical aggregate data", - Long: `History (asynq history) will show the number of processed and failed tasks -from the last x days. - -By default, it will show the data from the last 10 days. - -Example: asynq history -x=30 -> Shows stats from the last 30 days`, - Args: cobra.NoArgs, - Run: history, -} - -func init() { - rootCmd.AddCommand(historyCmd) - historyCmd.Flags().IntVarP(&days, "days", "x", 10, "show data from last x days") -} - -func history(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - - stats, err := i.History(days) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - printDailyStats(stats) -} - -func printDailyStats(stats []*asynq.DailyStats) { - format := strings.Repeat("%v\t", 4) + "\n" - tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate") - fmt.Fprintf(tw, format, "----------", "---------", "------", "----------") - for _, s := range stats { - var errrate string - if s.Processed == 0 { - errrate = "N/A" - } else { - errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) - } - fmt.Fprintf(tw, format, s.Date.Format("2006-01-02"), s.Processed, s.Failed, errrate) - } - tw.Flush() -} diff --git a/tools/asynq/cmd/kill.go b/tools/asynq/cmd/kill.go deleted file mode 100644 index 0ca4a03..0000000 --- a/tools/asynq/cmd/kill.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// killCmd represents the kill command -var killCmd = &cobra.Command{ - Use: "kill [task key]", - Short: "Kills a task given an identifier", - Long: `Kill (asynq kill) will put a task in dead state given an identifier. - -The command takes one argument which specifies the task to kill. -The task should be in either scheduled or retry state. -Identifier for a task should be obtained by running "asynq ls" command. - -Example: asynq kill r:1575732274:bnogo8gt6toe23vhef0g`, - Args: cobra.ExactArgs(1), - Run: kill, -} - -func init() { - rootCmd.AddCommand(killCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // killCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // killCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func kill(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - err := i.KillTaskByKey(args[0]) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Successfully killed %v\n", args[0]) - -} diff --git a/tools/asynq/cmd/killall.go b/tools/asynq/cmd/killall.go deleted file mode 100644 index bff4740..0000000 --- a/tools/asynq/cmd/killall.go +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var killallValidArgs = []string{"scheduled", "retry"} - -// killallCmd represents the killall command -var killallCmd = &cobra.Command{ - Use: "killall [state]", - Short: "Kills all tasks in the specified state", - Long: `Killall (asynq killall) will update all tasks from the specified state to dead state. - -The argument should be either "scheduled" or "retry". - -Example: asynq killall retry -> Update all retry tasks to dead tasks`, - ValidArgs: killallValidArgs, - Args: cobra.ExactValidArgs(1), - Run: killall, -} - -func init() { - rootCmd.AddCommand(killallCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // killallCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // killallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} - -func killall(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - var ( - n int - err error - ) - switch args[0] { - case "scheduled": - n, err = i.KillAllScheduledTasks() - case "retry": - n, err = i.KillAllRetryTasks() - default: - fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs) - os.Exit(1) - } - if err != nil { - fmt.Println(err) - os.Exit(1) - } - fmt.Printf("Successfully updated %d tasks to \"dead\" state\n", n) -} diff --git a/tools/asynq/cmd/ls.go b/tools/asynq/cmd/ls.go deleted file mode 100644 index 7ec296b..0000000 --- a/tools/asynq/cmd/ls.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "io" - "os" - "strings" - "time" - - "github.com/hibiken/asynq" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} - -// lsCmd represents the ls command -var lsCmd = &cobra.Command{ - Use: "ls [state]", - Short: "Lists tasks in the specified state", - Long: `Ls (asynq ls) will list all tasks in the specified state in a table format. - -The command takes one argument which specifies the state of tasks. -The argument value should be one of "enqueued", "inprogress", "scheduled", -"retry", or "dead". - -Example: -asynq ls dead -> Lists all tasks in dead state - -Enqueued tasks requires a queue name after ":" -Example: -asynq ls enqueued:default -> List tasks from default queue -asynq ls enqueued:critical -> List tasks from critical queue -`, - Args: cobra.ExactValidArgs(1), - Run: ls, -} - -// Flags -var pageSize int -var pageNum int - -func init() { - rootCmd.AddCommand(lsCmd) - lsCmd.Flags().IntVar(&pageSize, "size", 30, "page size") - lsCmd.Flags().IntVar(&pageNum, "page", 0, "page number - zero indexed (default 0)") -} - -func ls(cmd *cobra.Command, args []string) { - if pageSize < 0 { - fmt.Println("page size cannot be negative.") - os.Exit(1) - } - if pageNum < 0 { - fmt.Println("page number cannot be negative.") - os.Exit(1) - } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - parts := strings.Split(args[0], ":") - switch parts[0] { - case "enqueued": - if len(parts) != 2 { - fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n") - os.Exit(1) - } - listEnqueued(i, parts[1]) - case "inprogress": - listInProgress(i) - case "scheduled": - listScheduled(i) - case "retry": - listRetry(i) - case "dead": - listDead(i) - default: - fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs) - os.Exit(1) - } -} - -func listEnqueued(i *asynq.Inspector, qname string) { - tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if len(tasks) == 0 { - fmt.Printf("No enqueued tasks in %q queue\n", qname) - return - } - cols := []string{"ID", "Type", "Payload", "Queue"} - printTable(cols, func(w io.Writer, tmpl string) { - for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue) - } - }) - fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) -} - -func listInProgress(i *asynq.Inspector) { - tasks, err := i.ListInProgressTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if len(tasks) == 0 { - fmt.Println("No in-progress tasks") - return - } - cols := []string{"ID", "Type", "Payload"} - printTable(cols, func(w io.Writer, tmpl string) { - for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) - } - }) - fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) -} - -func listScheduled(i *asynq.Inspector) { - tasks, err := i.ListScheduledTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if len(tasks) == 0 { - fmt.Println("No scheduled tasks") - return - } - cols := []string{"Key", "Type", "Payload", "Process In", "Queue"} - printTable(cols, func(w io.Writer, tmpl string) { - for _, t := range tasks { - processIn := fmt.Sprintf("%.0f seconds", - t.NextEnqueueAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn, t.Queue) - } - }) - fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) -} - -func listRetry(i *asynq.Inspector) { - tasks, err := i.ListRetryTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if len(tasks) == 0 { - fmt.Println("No retry tasks") - return - } - cols := []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"} - printTable(cols, func(w io.Writer, tmpl string) { - for _, t := range tasks { - var nextRetry string - if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 { - nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) - } else { - nextRetry = "right now" - } - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry, t.Queue) - } - }) - fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) -} - -func listDead(i *asynq.Inspector) { - tasks, err := i.ListDeadTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) - if err != nil { - fmt.Println(err) - os.Exit(1) - } - if len(tasks) == 0 { - fmt.Println("No dead tasks") - return - } - cols := []string{"Key", "Type", "Payload", "Last Failed", "Last Error", "Queue"} - printTable(cols, func(w io.Writer, tmpl string) { - for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue) - } - }) - fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) -} diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go deleted file mode 100644 index 7ed04c0..0000000 --- a/tools/asynq/cmd/migrate.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/go-redis/redis/v7" - "github.com/google/uuid" - "github.com/hibiken/asynq/internal/base" - "github.com/spf13/cast" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// migrateCmd represents the migrate command -var migrateCmd = &cobra.Command{ - Use: "migrate", - Short: fmt.Sprintf("Migrate all tasks to be compatible with asynq@%s", base.Version), - Long: fmt.Sprintf("Migrate (asynq migrate) will convert all tasks in redis to be compatible with asynq@%s.", base.Version), - Run: migrate, -} - -func init() { - rootCmd.AddCommand(migrateCmd) -} - -func migrate(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - - lists := []string{base.InProgressQueue} - allQueues, err := c.SMembers(base.AllQueues).Result() - if err != nil { - fmt.Printf("error: could not read all queues: %v", err) - os.Exit(1) - } - lists = append(lists, allQueues...) - for _, key := range lists { - if err := migrateList(c, key); err != nil { - fmt.Printf("error: %v", err) - os.Exit(1) - } - } - - zsets := []string{base.ScheduledQueue, base.RetryQueue, base.DeadQueue} - for _, key := range zsets { - if err := migrateZSet(c, key); err != nil { - fmt.Printf("error: %v", err) - os.Exit(1) - } - } -} - -type oldTaskMessage struct { - // Unchanged - Type string - Payload map[string]interface{} - ID uuid.UUID - Queue string - Retry int - Retried int - ErrorMsg string - UniqueKey string - - // Following fields have changed. - - // Deadline specifies the deadline for the task. - // Task won't be processed if it exceeded its deadline. - // The string shoulbe be in RFC3339 format. - // - // time.Time's zero value means no deadline. - Timeout string - - // Deadline specifies the deadline for the task. - // Task won't be processed if it exceeded its deadline. - // The string shoulbe be in RFC3339 format. - // - // time.Time's zero value means no deadline. - Deadline string -} - -var defaultTimeout = 30 * time.Minute - -func convertMessage(old *oldTaskMessage) (*base.TaskMessage, error) { - timeout, err := time.ParseDuration(old.Timeout) - if err != nil { - return nil, fmt.Errorf("could not parse Timeout field of %+v", old) - } - deadline, err := time.Parse(time.RFC3339, old.Deadline) - if err != nil { - return nil, fmt.Errorf("could not parse Deadline field of %+v", old) - } - if timeout == 0 && deadline.IsZero() { - timeout = defaultTimeout - } - if deadline.IsZero() { - // Zero value used to be time.Time{}, - // in the new schema zero value is represented by - // zero in Unix time. - deadline = time.Unix(0, 0) - } - return &base.TaskMessage{ - Type: old.Type, - Payload: old.Payload, - ID: uuid.New(), - Queue: old.Queue, - Retry: old.Retry, - Retried: old.Retried, - ErrorMsg: old.ErrorMsg, - UniqueKey: old.UniqueKey, - Timeout: int64(timeout.Seconds()), - Deadline: deadline.Unix(), - }, nil -} - -func deserialize(s string) (*base.TaskMessage, error) { - // Try deserializing as old message. - d := json.NewDecoder(strings.NewReader(s)) - d.UseNumber() - var old *oldTaskMessage - if err := d.Decode(&old); err != nil { - // Try deserializing as new message. - d = json.NewDecoder(strings.NewReader(s)) - d.UseNumber() - var msg *base.TaskMessage - if err := d.Decode(&msg); err != nil { - return nil, fmt.Errorf("could not deserialize %s into task message: %v", s, err) - } - return msg, nil - } - return convertMessage(old) -} - -func migrateZSet(c *redis.Client, key string) error { - if c.Exists(key).Val() == 0 { - // skip if key doesn't exist. - return nil - } - res, err := c.ZRangeWithScores(key, 0, -1).Result() - if err != nil { - return err - } - var msgs []*redis.Z - for _, z := range res { - s, err := cast.ToStringE(z.Member) - if err != nil { - return fmt.Errorf("could not cast to string: %v", err) - } - msg, err := deserialize(s) - if err != nil { - return err - } - encoded, err := base.EncodeMessage(msg) - if err != nil { - return fmt.Errorf("could not encode message from %q: %v", key, err) - } - msgs = append(msgs, &redis.Z{Score: z.Score, Member: encoded}) - } - if err := c.Rename(key, key+":backup").Err(); err != nil { - return fmt.Errorf("could not rename key %q: %v", key, err) - } - if err := c.ZAdd(key, msgs...).Err(); err != nil { - return fmt.Errorf("could not write new messages to %q: %v", key, err) - } - if err := c.Del(key + ":backup").Err(); err != nil { - return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) - } - return nil -} - -func migrateList(c *redis.Client, key string) error { - if c.Exists(key).Val() == 0 { - // skip if key doesn't exist. - return nil - } - res, err := c.LRange(key, 0, -1).Result() - if err != nil { - return err - } - var msgs []interface{} - for _, s := range res { - msg, err := deserialize(s) - if err != nil { - return err - } - encoded, err := base.EncodeMessage(msg) - if err != nil { - return fmt.Errorf("could not encode message from %q: %v", key, err) - } - msgs = append(msgs, encoded) - } - if err := c.Rename(key, key+":backup").Err(); err != nil { - return fmt.Errorf("could not rename key %q: %v", key, err) - } - if err := c.LPush(key, msgs...).Err(); err != nil { - return fmt.Errorf("could not write new messages to %q: %v", key, err) - } - if err := c.Del(key + ":backup").Err(); err != nil { - return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) - } - return nil -} diff --git a/tools/asynq/cmd/pause.go b/tools/asynq/cmd/pause.go deleted file mode 100644 index 091783b..0000000 --- a/tools/asynq/cmd/pause.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// pauseCmd represents the pause command -var pauseCmd = &cobra.Command{ - Use: "pause [queue name]", - Short: "Pauses the specified queue", - Long: `Pause (asynq pause) will pause the specified queue. -Asynq servers will not process tasks from paused queues. -Use the "unpause" command to resume a paused queue. - -Example: asynq pause default -> Pause the "default" queue`, - Args: cobra.ExactValidArgs(1), - Run: pause, -} - -func init() { - rootCmd.AddCommand(pauseCmd) -} - -func pause(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := rdb.NewRDB(c) - err := r.Pause(args[0]) - if err != nil { - fmt.Printf("error: %v\n", err) - os.Exit(1) - } - fmt.Printf("Successfully paused queue %q\n", args[0]) -} diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go new file mode 100644 index 0000000..6f9400e --- /dev/null +++ b/tools/asynq/cmd/queue.go @@ -0,0 +1,244 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "io" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +const separator = "=================================================" + +func init() { + rootCmd.AddCommand(queueCmd) + queueCmd.AddCommand(queueListCmd) + queueCmd.AddCommand(queueInspectCmd) + queueCmd.AddCommand(queueHistoryCmd) + queueHistoryCmd.Flags().IntP("days", "x", 10, "show data from last x days") + + queueCmd.AddCommand(queuePauseCmd) + queueCmd.AddCommand(queueUnpauseCmd) + queueCmd.AddCommand(queueRemoveCmd) + queueRemoveCmd.Flags().BoolP("force", "f", false, "remove the queue regardless of its size") +} + +var queueCmd = &cobra.Command{ + Use: "queue", + Short: "Manage queues", +} + +var queueListCmd = &cobra.Command{ + Use: "ls", + Short: "List queues", + // TODO: Use RunE instead? + Run: queueList, +} + +var queueInspectCmd = &cobra.Command{ + Use: "inspect QUEUE [QUEUE...]", + Short: "Display detailed information on one or more queues", + Args: cobra.MinimumNArgs(1), + // TODO: Use RunE instead? + Run: queueInspect, +} + +var queueHistoryCmd = &cobra.Command{ + Use: "history QUEUE [QUEUE...]", + Short: "Display historical aggregate data from one or more queues", + Args: cobra.MinimumNArgs(1), + Run: queueHistory, +} + +var queuePauseCmd = &cobra.Command{ + Use: "pause QUEUE [QUEUE...]", + Short: "Pause one or more queues", + Args: cobra.MinimumNArgs(1), + Run: queuePause, +} + +var queueUnpauseCmd = &cobra.Command{ + Use: "unpause QUEUE [QUEUE...]", + Short: "Unpause one or more queues", + Args: cobra.MinimumNArgs(1), + Run: queueUnpause, +} + +var queueRemoveCmd = &cobra.Command{ + Use: "rm QUEUE [QUEUE...]", + Short: "Remove one or more queues", + Args: cobra.MinimumNArgs(1), + Run: queueRemove, +} + +func queueList(cmd *cobra.Command, args []string) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + queues, err := i.Queues() + if err != nil { + fmt.Printf("error: Could not fetch list of queues: %v\n", err) + os.Exit(1) + } + for _, qname := range queues { + fmt.Println(qname) + } +} + +func queueInspect(cmd *cobra.Command, args []string) { + inspector := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + for i, qname := range args { + if i > 0 { + fmt.Printf("\n%s\n", separator) + } + fmt.Printf("\nQueue: %s\n\n", qname) + stats, err := inspector.CurrentStats(qname) + if err != nil { + fmt.Printf("error: %v\n", err) + continue + } + printQueueStats(stats) + } +} + +func printQueueStats(s *asynq.QueueStats) { + fmt.Printf("Size: %d\n", s.Size) + fmt.Printf("Paused: %t\n\n", s.Paused) + fmt.Println("Task Breakdown:") + printTable( + []string{"InProgress", "Enqueued", "Scheduled", "Retry", "Dead"}, + func(w io.Writer, tmpl string) { + fmt.Fprintf(w, tmpl, s.InProgress, s.Enqueued, s.Scheduled, s.Retry, s.Dead) + }, + ) + fmt.Println() + fmt.Printf("%s Stats:\n", s.Timestamp.UTC().Format("2006-01-02")) + printTable( + []string{"Processed", "Failed", "Error Rate"}, + func(w io.Writer, tmpl string) { + var errRate string + if s.Processed == 0 { + errRate = "N/A" + } else { + errRate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) + } + fmt.Fprintf(w, tmpl, s.Processed, s.Failed, errRate) + }, + ) +} + +func queueHistory(cmd *cobra.Command, args []string) { + days, err := cmd.Flags().GetInt("days") + if err != nil { + fmt.Printf("error: Internal error: %v\n", err) + os.Exit(1) + } + inspector := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + for i, qname := range args { + if i > 0 { + fmt.Printf("\n%s\n", separator) + } + fmt.Printf("\nQueue: %s\n\n", qname) + stats, err := inspector.History(qname, days) + if err != nil { + fmt.Printf("error: %v\n", err) + continue + } + printDailyStats(stats) + } +} + +func printDailyStats(stats []*asynq.DailyStats) { + printTable( + []string{"Date (UTC)", "Processed", "Failed", "Error Rate"}, + func(w io.Writer, tmpl string) { + for _, s := range stats { + var errRate string + if s.Processed == 0 { + errRate = "N/A" + } else { + errRate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) + } + fmt.Fprintf(w, tmpl, s.Date.Format("2006-01-02"), s.Processed, s.Failed, errRate) + } + }, + ) +} + +func queuePause(cmd *cobra.Command, args []string) { + inspector := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + for _, qname := range args { + err := inspector.PauseQueue(qname) + if err != nil { + fmt.Println(err) + continue + } + fmt.Printf("Successfully paused queue %q\n", qname) + } +} + +func queueUnpause(cmd *cobra.Command, args []string) { + inspector := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + for _, qname := range args { + err := inspector.UnpauseQueue(qname) + if err != nil { + fmt.Println(err) + continue + } + fmt.Printf("Successfully unpaused queue %q\n", qname) + } +} + +func queueRemove(cmd *cobra.Command, args []string) { + // TODO: Use inspector once RemoveQueue become public API. + force, err := cmd.Flags().GetBool("force") + if err != nil { + fmt.Printf("error: Internal error: %v\n", err) + os.Exit(1) + } + + c := redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + r := rdb.NewRDB(c) + for _, qname := range args { + err = r.RemoveQueue(qname, force) + if err != nil { + if _, ok := err.(*rdb.ErrQueueNotEmpty); ok { + fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname) + continue + } + fmt.Printf("error: %v\n", err) + continue + } + fmt.Printf("Successfully removed queue %q\n", qname) + } +} diff --git a/tools/asynq/cmd/rmq.go b/tools/asynq/cmd/rmq.go deleted file mode 100644 index e62a883..0000000 --- a/tools/asynq/cmd/rmq.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// rmqCmd represents the rmq command -var rmqCmd = &cobra.Command{ - Use: "rmq [queue name]", - Short: "Removes the specified queue", - Long: `Rmq (asynq rmq) will remove the specified queue. -By default, it will remove the queue only if it's empty. -Use --force option to override this behavior. - -Example: asynq rmq low -> Removes "low" queue`, - Args: cobra.ExactValidArgs(1), - Run: rmq, -} - -var rmqForce bool - -func init() { - rootCmd.AddCommand(rmqCmd) - rmqCmd.Flags().BoolVarP(&rmqForce, "force", "f", false, "remove the queue regardless of its size") -} - -func rmq(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := rdb.NewRDB(c) - err := r.RemoveQueue(args[0], rmqForce) - if err != nil { - if _, ok := err.(*rdb.ErrQueueNotEmpty); ok { - fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq rmq --force %s'\n", err, args[0]) - os.Exit(1) - } - fmt.Printf("error: %v", err) - os.Exit(1) - } - fmt.Printf("Successfully removed queue %q\n", args[0]) -} diff --git a/tools/asynq/cmd/servers.go b/tools/asynq/cmd/server.go similarity index 85% rename from tools/asynq/cmd/servers.go rename to tools/asynq/cmd/server.go index 9431b41..5b32277 100644 --- a/tools/asynq/cmd/servers.go +++ b/tools/asynq/cmd/server.go @@ -18,12 +18,21 @@ import ( "github.com/spf13/viper" ) -// serversCmd represents the servers command -var serversCmd = &cobra.Command{ - Use: "servers", - Short: "Shows all running worker servers", - Long: `Servers (asynq servers) will show all running worker servers -pulling tasks from the specified redis instance. +func init() { + rootCmd.AddCommand(serverCmd) + serverCmd.AddCommand(serverListCmd) +} + +var serverCmd = &cobra.Command{ + Use: "server", + Short: "Manage servers", +} + +var serverListCmd = &cobra.Command{ + Use: "ls", + Short: "List servers", + Long: `Server list (asynq server ls) shows all running worker servers +pulling tasks from the given redis instance. The command shows the following for each server: * Host and PID of the process in which the server is running @@ -34,15 +43,10 @@ The command shows the following for each server: A "running" server is pulling tasks from queues and processing them. A "quiet" server is no longer pulling new tasks from queues`, - Args: cobra.NoArgs, - Run: servers, + Run: serverList, } -func init() { - rootCmd.AddCommand(serversCmd) -} - -func servers(cmd *cobra.Command, args []string) { +func serverList(cmd *cobra.Command, args []string) { r := rdb.NewRDB(redis.NewClient(&redis.Options{ Addr: viper.GetString("uri"), DB: viper.GetInt("db"), @@ -81,12 +85,6 @@ func servers(cmd *cobra.Command, args []string) { printTable(cols, printRows) } -// timeAgo takes a time and returns a string of the format " ago". -func timeAgo(since time.Time) string { - d := time.Since(since).Round(time.Second) - return fmt.Sprintf("%v ago", d) -} - func formatQueues(qmap map[string]int) string { // sort queues by priority and name type queue struct { @@ -116,3 +114,9 @@ func formatQueues(qmap map[string]int) string { } return b.String() } + +// timeAgo takes a time and returns a string of the format " ago". +func timeAgo(since time.Time) string { + d := time.Since(since).Round(time.Second) + return fmt.Sprintf("%v ago", d) +} diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index 8db492e..c55ff3d 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "text/tabwriter" + "time" "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/rdb" @@ -51,6 +52,17 @@ func init() { // statsCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") } +type AggregateStats struct { + InProgress int + Enqueued int + Scheduled int + Retry int + Dead int + Processed int + Failed int + Timestamp time.Time +} + func stats(cmd *cobra.Command, args []string) { c := redis.NewClient(&redis.Options{ Addr: viper.GetString("uri"), @@ -59,26 +71,45 @@ func stats(cmd *cobra.Command, args []string) { }) r := rdb.NewRDB(c) - stats, err := r.CurrentStats() + queues, err := r.AllQueues() if err != nil { fmt.Println(err) os.Exit(1) } + + var aggStats AggregateStats + var stats []*rdb.Stats + for _, qname := range queues { + s, err := r.CurrentStats(qname) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + aggStats.InProgress += s.InProgress + aggStats.Enqueued += s.Enqueued + aggStats.Scheduled += s.Scheduled + aggStats.Retry += s.Retry + aggStats.Dead += s.Dead + aggStats.Processed += s.Processed + aggStats.Failed += s.Failed + aggStats.Timestamp = s.Timestamp + stats = append(stats, s) + } info, err := r.RedisInfo() if err != nil { fmt.Println(err) os.Exit(1) } - fmt.Println("STATES") - printStates(stats) + fmt.Println("BY STATES") + printStatsByState(&aggStats) fmt.Println() - fmt.Println("QUEUES") - printQueues(stats.Queues) + fmt.Println("BY QUEUES") + printStatsByQueue(stats) fmt.Println() - fmt.Printf("STATS FOR %s UTC\n", stats.Timestamp.UTC().Format("2006-01-02")) - printStats(stats) + fmt.Printf("STATS FOR %s UTC\n", aggStats.Timestamp.UTC().Format("2006-01-02")) + printSuccessFailureStats(&aggStats) fmt.Println() fmt.Println("REDIS INFO") @@ -86,7 +117,7 @@ func stats(cmd *cobra.Command, args []string) { fmt.Println() } -func printStates(s *rdb.Stats) { +func printStatsByState(s *AggregateStats) { format := strings.Repeat("%v\t", 5) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) fmt.Fprintf(tw, format, "InProgress", "Enqueued", "Scheduled", "Retry", "Dead") @@ -95,13 +126,13 @@ func printStates(s *rdb.Stats) { tw.Flush() } -func printQueues(queues []*rdb.Queue) { +func printStatsByQueue(stats []*rdb.Stats) { var headers, seps, counts []string - for _, q := range queues { - title := queueTitle(q) + for _, s := range stats { + title := queueTitle(s) headers = append(headers, title) seps = append(seps, strings.Repeat("-", len(title))) - counts = append(counts, strconv.Itoa(q.Size)) + counts = append(counts, strconv.Itoa(s.Size)) } format := strings.Repeat("%v\t", len(headers)) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) @@ -111,16 +142,16 @@ func printQueues(queues []*rdb.Queue) { tw.Flush() } -func queueTitle(q *rdb.Queue) string { +func queueTitle(s *rdb.Stats) string { var b strings.Builder - b.WriteString(strings.Title(q.Name)) - if q.Paused { + b.WriteString(strings.Title(s.Queue)) + if s.Paused { b.WriteString(" (Paused)") } return b.String() } -func printStats(s *rdb.Stats) { +func printSuccessFailureStats(s *AggregateStats) { format := strings.Repeat("%v\t", 3) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) fmt.Fprintf(tw, format, "Processed", "Failed", "Error Rate") diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go new file mode 100644 index 0000000..34283d3 --- /dev/null +++ b/tools/asynq/cmd/task.go @@ -0,0 +1,515 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "io" + "os" + "time" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +func init() { + rootCmd.AddCommand(taskCmd) + taskCmd.AddCommand(taskListCmd) + taskListCmd.Flags().StringP("queue", "q", "", "queue to inspect") + taskListCmd.Flags().StringP("state", "s", "", "state of the tasks to inspect") + taskListCmd.Flags().Int("page", 1, "page number") + taskListCmd.Flags().Int("size", 30, "page size") + taskListCmd.MarkFlagRequired("queue") + taskListCmd.MarkFlagRequired("state") + + taskCmd.AddCommand(taskCancelCmd) + + taskCmd.AddCommand(taskKillCmd) + taskKillCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") + taskKillCmd.Flags().StringP("key", "k", "", "key of the task") + taskKillCmd.MarkFlagRequired("queue") + taskKillCmd.MarkFlagRequired("key") + + taskCmd.AddCommand(taskDeleteCmd) + taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") + taskDeleteCmd.Flags().StringP("key", "k", "", "key of the task") + taskDeleteCmd.MarkFlagRequired("queue") + taskDeleteCmd.MarkFlagRequired("key") + + taskCmd.AddCommand(taskRunCmd) + taskRunCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") + taskRunCmd.Flags().StringP("key", "k", "", "key of the task") + taskRunCmd.MarkFlagRequired("queue") + taskRunCmd.MarkFlagRequired("key") + + taskCmd.AddCommand(taskKillAllCmd) + taskKillAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") + taskKillAllCmd.Flags().StringP("state", "s", "", "state of the tasks") + taskKillAllCmd.MarkFlagRequired("queue") + taskKillAllCmd.MarkFlagRequired("state") + + taskCmd.AddCommand(taskDeleteAllCmd) + taskDeleteAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") + taskDeleteAllCmd.Flags().StringP("state", "s", "", "state of the tasks") + taskDeleteAllCmd.MarkFlagRequired("queue") + taskDeleteAllCmd.MarkFlagRequired("state") + + taskCmd.AddCommand(taskRunAllCmd) + taskRunAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") + taskRunAllCmd.Flags().StringP("state", "s", "", "state of the tasks") + taskRunAllCmd.MarkFlagRequired("queue") + taskRunAllCmd.MarkFlagRequired("state") +} + +var taskCmd = &cobra.Command{ + Use: "task", + Short: "Manage tasks", +} + +var taskListCmd = &cobra.Command{ + Use: "ls --queue=QUEUE --state=STATE", + Short: "List tasks", + Long: `List tasks of the given state from the specified queue. + +The value for the state flag should be one of: +- in-progress +- enqueued +- scheduled +- retry +- dead + +List opeartion paginates the result set. +By default, the command fetches the first 30 tasks. +Use --page and --size flags to specify the page number and size. + +Example: +To list enqueued tasks from "default" queue, run + asynq task ls --queue=default --state=enqueued + +To list the tasks from the second page, run + asynq task ls --queue=default --state=enqueued --page=1`, + Run: taskList, +} + +var taskCancelCmd = &cobra.Command{ + Use: "cancel TASK_ID [TASK_ID...]", + Short: "Cancel one or more in-progress tasks", + Args: cobra.MinimumNArgs(1), + Run: taskCancel, +} + +var taskKillCmd = &cobra.Command{ + Use: "kill --queue=QUEUE --key=KEY", + Short: "Kill a task with the given key", + Args: cobra.NoArgs, + Run: taskKill, +} + +var taskDeleteCmd = &cobra.Command{ + Use: "delete --queue=QUEUE --key=KEY", + Short: "Delete a task with the given key", + Args: cobra.NoArgs, + Run: taskDelete, +} + +var taskRunCmd = &cobra.Command{ + Use: "run --queue=QUEUE --key=KEY", + Short: "Run a task with the given key", + Args: cobra.NoArgs, + Run: taskRun, +} + +var taskKillAllCmd = &cobra.Command{ + Use: "kill-all --queue=QUEUE --state=STATE", + Short: "Kill all tasks in the given state", + Args: cobra.NoArgs, + Run: taskKillAll, +} + +var taskDeleteAllCmd = &cobra.Command{ + Use: "delete-all --queue=QUEUE --key=KEY", + Short: "Delete all tasks in the given state", + Args: cobra.NoArgs, + Run: taskDeleteAll, +} + +var taskRunAllCmd = &cobra.Command{ + Use: "run-all --queue=QUEUE --key=KEY", + Short: "Run all tasks in the given state", + Args: cobra.NoArgs, + Run: taskRunAll, +} + +func taskList(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + state, err := cmd.Flags().GetString("state") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + pageNum, err := cmd.Flags().GetInt("page") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + pageSize, err := cmd.Flags().GetInt("size") + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + switch state { + case "in-progress": + listInProgressTasks(qname, pageNum, pageSize) + case "enqueued": + listEnqueuedTasks(qname, pageNum, pageSize) + case "scheduled": + listScheduledTasks(qname, pageNum, pageSize) + case "retry": + listRetryTasks(qname, pageNum, pageSize) + case "dead": + listDeadTasks(qname, pageNum, pageSize) + default: + fmt.Printf("error: state=%q is not supported\n", state) + os.Exit(1) + } +} + +func listInProgressTasks(qname string, pageNum, pageSize int) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No in-progress tasks in %q queue\n", qname) + return + } + printTable( + []string{"ID", "Type", "Payload"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) + } + }, + ) +} + +func listEnqueuedTasks(qname string, pageNum, pageSize int) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No enqueued tasks in %q queue\n", qname) + return + } + printTable( + []string{"ID", "Type", "Payload"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) + } + }, + ) +} + +func listScheduledTasks(qname string, pageNum, pageSize int) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No scheduled tasks in %q queue\n", qname) + return + } + printTable( + []string{"Key", "Type", "Payload", "Process In"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + processIn := fmt.Sprintf("%.0f seconds", + t.NextEnqueueAt.Sub(time.Now()).Seconds()) + fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn) + } + }, + ) +} + +func listRetryTasks(qname string, pageNum, pageSize int) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No retry tasks in %q queue\n", qname) + return + } + printTable( + []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + var nextRetry string + if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 { + nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) + } else { + nextRetry = "right now" + } + fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry) + } + }, + ) +} + +func listDeadTasks(qname string, pageNum, pageSize int) { + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(tasks) == 0 { + fmt.Printf("No dead tasks in %q queue\n", qname) + return + } + printTable( + []string{"Key", "Type", "Payload", "Last Failed", "Last Error"}, + func(w io.Writer, tmpl string) { + for _, t := range tasks { + fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) + } + }) +} + +func taskCancel(cmd *cobra.Command, args []string) { + r := rdb.NewRDB(redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + })) + + for _, id := range args { + err := r.PublishCancelation(id) + if err != nil { + fmt.Printf("error: could not send cancelation signal: %v\n", err) + continue + } + fmt.Printf("Sent cancelation signal for task %s\n", id) + } +} + +func taskKill(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + key, err := cmd.Flags().GetString("key") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + err = i.KillTaskByKey(qname, key) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Println("task transitioned to dead state") +} + +func taskDelete(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + key, err := cmd.Flags().GetString("key") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + err = i.DeleteTaskByKey(qname, key) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Println("task deleted") +} + +func taskRun(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + key, err := cmd.Flags().GetString("key") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + err = i.EnqueueTaskByKey(qname, key) + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Println("task transitioned to pending state") +} + +func taskKillAll(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + state, err := cmd.Flags().GetString("state") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + var n int + switch state { + case "scheduled": + n, err = i.KillAllScheduledTasks(qname) + case "retry": + n, err = i.KillAllRetryTasks(qname) + default: + fmt.Printf("error: unsupported state %q\n", state) + os.Exit(1) + } + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Printf("%d tasks transitioned to dead state\n", n) +} + +func taskDeleteAll(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + state, err := cmd.Flags().GetString("state") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + var n int + switch state { + case "scheduled": + n, err = i.DeleteAllScheduledTasks(qname) + case "retry": + n, err = i.DeleteAllRetryTasks(qname) + case "dead": + n, err = i.DeleteAllDeadTasks(qname) + default: + fmt.Printf("error: unsupported state %q\n", state) + os.Exit(1) + } + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Printf("%d tasks deleted\n", n) +} + +func taskRunAll(cmd *cobra.Command, args []string) { + qname, err := cmd.Flags().GetString("queue") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + state, err := cmd.Flags().GetString("state") + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + + i := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + var n int + switch state { + case "scheduled": + n, err = i.EnqueueAllScheduledTasks(qname) + case "retry": + n, err = i.EnqueueAllRetryTasks(qname) + case "dead": + n, err = i.EnqueueAllDeadTasks(qname) + default: + fmt.Printf("error: unsupported state %q\n", state) + os.Exit(1) + } + if err != nil { + fmt.Printf("error: %v\n", err) + os.Exit(1) + } + fmt.Printf("%d tasks transitioned to pending state\n", n) +} diff --git a/tools/asynq/cmd/unpause.go b/tools/asynq/cmd/unpause.go deleted file mode 100644 index 79e0764..0000000 --- a/tools/asynq/cmd/unpause.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "os" - - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// unpauseCmd represents the unpause command -var unpauseCmd = &cobra.Command{ - Use: "unpause [queue name]", - Short: "Unpauses the specified queue", - Long: `Unpause (asynq unpause) will unpause the specified queue. -Asynq servers will process tasks from unpaused/resumed queues. - -Example: asynq unpause default -> Resume the "default" queue`, - Args: cobra.ExactValidArgs(1), - Run: unpause, -} - -func init() { - rootCmd.AddCommand(unpauseCmd) -} - -func unpause(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := rdb.NewRDB(c) - err := r.Unpause(args[0]) - if err != nil { - fmt.Printf("error: %v\n", err) - os.Exit(1) - } - fmt.Printf("Successfully resumed queue %q\n", args[0]) -} diff --git a/tools/asynq/cmd/workers.go b/tools/asynq/cmd/workers.go deleted file mode 100644 index b395ef4..0000000 --- a/tools/asynq/cmd/workers.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "fmt" - "io" - "os" - "sort" - - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -// workersCmd represents the workers command -var workersCmd = &cobra.Command{ - Use: "workers", - Short: "Shows all running workers information", - Long: `Workers (asynq workers) will show all running workers information. - -The command shows the following for each worker: -* Process in which the worker is running -* ID of the task worker is processing -* Type of the task worker is processing -* Payload of the task worker is processing -* Queue that the task was pulled from. -* Time the worker started processing the task`, - Args: cobra.NoArgs, - Run: workers, -} - -func init() { - rootCmd.AddCommand(workersCmd) -} - -func workers(cmd *cobra.Command, args []string) { - r := rdb.NewRDB(redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - })) - - workers, err := r.ListWorkers() - if err != nil { - fmt.Println(err) - os.Exit(1) - } - - if len(workers) == 0 { - fmt.Println("No workers") - return - } - - // sort by started timestamp or ID. - sort.Slice(workers, func(i, j int) bool { - x, y := workers[i], workers[j] - if x.Started != y.Started { - return x.Started.Before(y.Started) - } - return x.ID < y.ID - }) - - cols := []string{"Process", "ID", "Type", "Payload", "Queue", "Started"} - printRows := func(w io.Writer, tmpl string) { - for _, wk := range workers { - fmt.Fprintf(w, tmpl, - fmt.Sprintf("%s:%d", wk.Host, wk.PID), wk.ID, wk.Type, wk.Payload, wk.Queue, timeAgo(wk.Started)) - } - } - printTable(cols, printRows) -}