diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index 6f9400e..2c09d02 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -9,11 +9,9 @@ import ( "io" "os" - "github.com/go-redis/redis/v7" "github.com/hibiken/asynq" "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" - "github.com/spf13/viper" ) const separator = "=================================================" @@ -80,12 +78,8 @@ var queueRemoveCmd = &cobra.Command{ } func queueList(cmd *cobra.Command, args []string) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - queues, err := i.Queues() + inspector := createInspector() + queues, err := inspector.Queues() if err != nil { fmt.Printf("error: Could not fetch list of queues: %v\n", err) os.Exit(1) @@ -96,11 +90,7 @@ func queueList(cmd *cobra.Command, args []string) { } func queueInspect(cmd *cobra.Command, args []string) { - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + inspector := createInspector() for i, qname := range args { if i > 0 { fmt.Printf("\n%s\n", separator) @@ -147,11 +137,7 @@ func queueHistory(cmd *cobra.Command, args []string) { fmt.Printf("error: Internal error: %v\n", err) os.Exit(1) } - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + inspector := createInspector() for i, qname := range args { if i > 0 { fmt.Printf("\n%s\n", separator) @@ -184,11 +170,7 @@ func printDailyStats(stats []*asynq.DailyStats) { } func queuePause(cmd *cobra.Command, args []string) { - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + inspector := createInspector() for _, qname := range args { err := inspector.PauseQueue(qname) if err != nil { @@ -200,11 +182,7 @@ func queuePause(cmd *cobra.Command, args []string) { } func queueUnpause(cmd *cobra.Command, args []string) { - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + inspector := createInspector() for _, qname := range args { err := inspector.UnpauseQueue(qname) if err != nil { @@ -223,12 +201,7 @@ func queueRemove(cmd *cobra.Command, args []string) { os.Exit(1) } - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := rdb.NewRDB(c) + r := createRDB() for _, qname := range args { err = r.RemoveQueue(qname, force) if err != nil { diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index 12ceaa4..175e935 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -11,7 +11,10 @@ import ( "strings" "text/tabwriter" + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" homedir "github.com/mitchellh/go-homedir" @@ -20,10 +23,15 @@ import ( var cfgFile string -// Flags -var uri string -var db int -var password string +// Global flag variables +var ( + uri string + db int + password string + + useRedisCluster bool + clusterAddrs string +) // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ @@ -62,9 +70,16 @@ func init() { rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "redis server URI") rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "redis database number (default is 0)") rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "password to use when connecting to redis server") + rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "connect to redis cluster") + rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs", + "127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005", + "list of comma-separated redis server addresses") + // Bind flags with config. viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri")) viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db")) viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password")) + viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster")) + viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs")) } // initConfig reads in config file and ENV variables if set. @@ -93,6 +108,44 @@ func initConfig() { } } +// createRDB creates a RDB instance using flag values and returns it. +func createRDB() *rdb.RDB { + var c redis.UniversalClient + if useRedisCluster { + addrs := strings.Split(viper.GetString("cluster_addrs"), ",") + c = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + Password: viper.GetString("password"), + }) + } else { + c = redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + } + return rdb.NewRDB(c) +} + +// createRDB creates a Inspector instance using flag values and returns it. +func createInspector() *asynq.Inspector { + var connOpt asynq.RedisConnOpt + if useRedisCluster { + addrs := strings.Split(viper.GetString("cluster_addrs"), ",") + connOpt = asynq.RedisClusterClientOpt{ + Addrs: addrs, + Password: viper.GetString("password"), + } + } else { + connOpt = asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + } + } + return asynq.NewInspector(connOpt) +} + // printTable is a helper function to print data in table format. // // cols is a list of headers and printRow specifies how to print rows. diff --git a/tools/asynq/cmd/server.go b/tools/asynq/cmd/server.go index 5b32277..4958a4e 100644 --- a/tools/asynq/cmd/server.go +++ b/tools/asynq/cmd/server.go @@ -12,10 +12,7 @@ import ( "strings" "time" - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" - "github.com/spf13/viper" ) func init() { @@ -47,11 +44,7 @@ A "quiet" server is no longer pulling new tasks from queues`, } func serverList(cmd *cobra.Command, args []string) { - r := rdb.NewRDB(redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - })) + r := createRDB() servers, err := r.ListServers() if err != nil { diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index c55ff3d..55c0358 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -12,10 +12,8 @@ import ( "text/tabwriter" "time" - "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" - "github.com/spf13/viper" ) // statsCmd represents the stats command @@ -64,12 +62,7 @@ type AggregateStats struct { } func stats(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := rdb.NewRDB(c) + r := createRDB() queues, err := r.AllQueues() if err != nil { diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 34283d3..6e85367 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -10,11 +10,8 @@ import ( "os" "time" - "github.com/go-redis/redis/v7" "github.com/hibiken/asynq" - "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" - "github.com/spf13/viper" ) func init() { @@ -185,11 +182,7 @@ func taskList(cmd *cobra.Command, args []string) { } func listInProgressTasks(qname string, pageNum, pageSize int) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) @@ -210,11 +203,7 @@ func listInProgressTasks(qname string, pageNum, pageSize int) { } func listEnqueuedTasks(qname string, pageNum, pageSize int) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) @@ -235,11 +224,7 @@ func listEnqueuedTasks(qname string, pageNum, pageSize int) { } func listScheduledTasks(qname string, pageNum, pageSize int) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) @@ -262,11 +247,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { } func listRetryTasks(qname string, pageNum, pageSize int) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) @@ -293,11 +274,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) { } func listDeadTasks(qname string, pageNum, pageSize int) { - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) @@ -317,12 +294,7 @@ func listDeadTasks(qname string, pageNum, pageSize int) { } func taskCancel(cmd *cobra.Command, args []string) { - r := rdb.NewRDB(redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - })) - + r := createRDB() for _, id := range args { err := r.PublishCancelation(id) if err != nil { @@ -345,11 +317,7 @@ func taskKill(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() err = i.KillTaskByKey(qname, key) if err != nil { fmt.Printf("error: %v\n", err) @@ -370,11 +338,7 @@ func taskDelete(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() err = i.DeleteTaskByKey(qname, key) if err != nil { fmt.Printf("error: %v\n", err) @@ -395,11 +359,7 @@ func taskRun(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() err = i.EnqueueTaskByKey(qname, key) if err != nil { fmt.Printf("error: %v\n", err) @@ -420,11 +380,7 @@ func taskKillAll(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() var n int switch state { case "scheduled": @@ -454,11 +410,7 @@ func taskDeleteAll(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() var n int switch state { case "scheduled": @@ -490,11 +442,7 @@ func taskRunAll(cmd *cobra.Command, args []string) { os.Exit(1) } - i := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) + i := createInspector() var n int switch state { case "scheduled": diff --git a/tools/go.mod b/tools/go.mod index fb60511..1295aca 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -3,7 +3,7 @@ module github.com/hibiken/asynq/tools go 1.13 require ( - github.com/go-redis/redis/v7 v7.2.0 + github.com/go-redis/redis/v7 v7.4.0 github.com/google/uuid v1.1.1 github.com/hibiken/asynq v0.4.0 github.com/mitchellh/go-homedir v1.1.0 diff --git a/tools/go.sum b/tools/go.sum index dd4c52b..0cccc8c 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -28,6 +28,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs= github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= +github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=