2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 08:12:19 +08:00

Add redis-cluster support in asynq CLI

This commit is contained in:
Ken Hibino 2020-08-31 06:46:56 -07:00
parent dab8295883
commit a9c31553b8
7 changed files with 81 additions and 119 deletions

View File

@ -9,11 +9,9 @@ import (
"io" "io"
"os" "os"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
) )
const separator = "=================================================" const separator = "================================================="
@ -80,12 +78,8 @@ var queueRemoveCmd = &cobra.Command{
} }
func queueList(cmd *cobra.Command, args []string) { func queueList(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ inspector := createInspector()
Addr: viper.GetString("uri"), queues, err := inspector.Queues()
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
queues, err := i.Queues()
if err != nil { if err != nil {
fmt.Printf("error: Could not fetch list of queues: %v\n", err) fmt.Printf("error: Could not fetch list of queues: %v\n", err)
os.Exit(1) os.Exit(1)
@ -96,11 +90,7 @@ func queueList(cmd *cobra.Command, args []string) {
} }
func queueInspect(cmd *cobra.Command, args []string) { func queueInspect(cmd *cobra.Command, args []string) {
inspector := asynq.NewInspector(asynq.RedisClientOpt{ inspector := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
for i, qname := range args { for i, qname := range args {
if i > 0 { if i > 0 {
fmt.Printf("\n%s\n", separator) fmt.Printf("\n%s\n", separator)
@ -147,11 +137,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
fmt.Printf("error: Internal error: %v\n", err) fmt.Printf("error: Internal error: %v\n", err)
os.Exit(1) os.Exit(1)
} }
inspector := asynq.NewInspector(asynq.RedisClientOpt{ inspector := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
for i, qname := range args { for i, qname := range args {
if i > 0 { if i > 0 {
fmt.Printf("\n%s\n", separator) fmt.Printf("\n%s\n", separator)
@ -184,11 +170,7 @@ func printDailyStats(stats []*asynq.DailyStats) {
} }
func queuePause(cmd *cobra.Command, args []string) { func queuePause(cmd *cobra.Command, args []string) {
inspector := asynq.NewInspector(asynq.RedisClientOpt{ inspector := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
for _, qname := range args { for _, qname := range args {
err := inspector.PauseQueue(qname) err := inspector.PauseQueue(qname)
if err != nil { if err != nil {
@ -200,11 +182,7 @@ func queuePause(cmd *cobra.Command, args []string) {
} }
func queueUnpause(cmd *cobra.Command, args []string) { func queueUnpause(cmd *cobra.Command, args []string) {
inspector := asynq.NewInspector(asynq.RedisClientOpt{ inspector := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
for _, qname := range args { for _, qname := range args {
err := inspector.UnpauseQueue(qname) err := inspector.UnpauseQueue(qname)
if err != nil { if err != nil {
@ -223,12 +201,7 @@ func queueRemove(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
c := redis.NewClient(&redis.Options{ r := createRDB()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
for _, qname := range args { for _, qname := range args {
err = r.RemoveQueue(qname, force) err = r.RemoveQueue(qname, force)
if err != nil { if err != nil {

View File

@ -11,7 +11,10 @@ import (
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
homedir "github.com/mitchellh/go-homedir" homedir "github.com/mitchellh/go-homedir"
@ -20,10 +23,15 @@ import (
var cfgFile string var cfgFile string
// Flags // Global flag variables
var uri string var (
var db int uri string
var password string db int
password string
useRedisCluster bool
clusterAddrs string
)
// rootCmd represents the base command when called without any subcommands // rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{ var rootCmd = &cobra.Command{
@ -62,9 +70,16 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "redis server URI") rootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "127.0.0.1:6379", "redis server URI")
rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "redis database number (default is 0)") rootCmd.PersistentFlags().IntVarP(&db, "db", "n", 0, "redis database number (default is 0)")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "password to use when connecting to redis server") rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "password to use when connecting to redis server")
rootCmd.PersistentFlags().BoolVar(&useRedisCluster, "cluster", false, "connect to redis cluster")
rootCmd.PersistentFlags().StringVar(&clusterAddrs, "cluster_addrs",
"127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002,127.0.0.1:7003,127.0.0.1:7004,127.0.0.1:7005",
"list of comma-separated redis server addresses")
// Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri")) viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db")) viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password")) viper.BindPFlag("password", rootCmd.PersistentFlags().Lookup("password"))
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
} }
// initConfig reads in config file and ENV variables if set. // initConfig reads in config file and ENV variables if set.
@ -93,6 +108,44 @@ func initConfig() {
} }
} }
// createRDB creates a RDB instance using flag values and returns it.
func createRDB() *rdb.RDB {
var c redis.UniversalClient
if useRedisCluster {
addrs := strings.Split(viper.GetString("cluster_addrs"), ",")
c = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
Password: viper.GetString("password"),
})
} else {
c = redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
}
return rdb.NewRDB(c)
}
// createRDB creates a Inspector instance using flag values and returns it.
func createInspector() *asynq.Inspector {
var connOpt asynq.RedisConnOpt
if useRedisCluster {
addrs := strings.Split(viper.GetString("cluster_addrs"), ",")
connOpt = asynq.RedisClusterClientOpt{
Addrs: addrs,
Password: viper.GetString("password"),
}
} else {
connOpt = asynq.RedisClientOpt{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}
}
return asynq.NewInspector(connOpt)
}
// printTable is a helper function to print data in table format. // printTable is a helper function to print data in table format.
// //
// cols is a list of headers and printRow specifies how to print rows. // cols is a list of headers and printRow specifies how to print rows.

View File

@ -12,10 +12,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
) )
func init() { func init() {
@ -47,11 +44,7 @@ A "quiet" server is no longer pulling new tasks from queues`,
} }
func serverList(cmd *cobra.Command, args []string) { func serverList(cmd *cobra.Command, args []string) {
r := rdb.NewRDB(redis.NewClient(&redis.Options{ r := createRDB()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
servers, err := r.ListServers() servers, err := r.ListServers()
if err != nil { if err != nil {

View File

@ -12,10 +12,8 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
) )
// statsCmd represents the stats command // statsCmd represents the stats command
@ -64,12 +62,7 @@ type AggregateStats struct {
} }
func stats(cmd *cobra.Command, args []string) { func stats(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{ r := createRDB()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
queues, err := r.AllQueues() queues, err := r.AllQueues()
if err != nil { if err != nil {

View File

@ -10,11 +10,8 @@ import (
"os" "os"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper"
) )
func init() { func init() {
@ -185,11 +182,7 @@ func taskList(cmd *cobra.Command, args []string) {
} }
func listInProgressTasks(qname string, pageNum, pageSize int) { func listInProgressTasks(qname string, pageNum, pageSize int) {
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -210,11 +203,7 @@ func listInProgressTasks(qname string, pageNum, pageSize int) {
} }
func listEnqueuedTasks(qname string, pageNum, pageSize int) { func listEnqueuedTasks(qname string, pageNum, pageSize int) {
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -235,11 +224,7 @@ func listEnqueuedTasks(qname string, pageNum, pageSize int) {
} }
func listScheduledTasks(qname string, pageNum, pageSize int) { func listScheduledTasks(qname string, pageNum, pageSize int) {
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -262,11 +247,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
} }
func listRetryTasks(qname string, pageNum, pageSize int) { func listRetryTasks(qname string, pageNum, pageSize int) {
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -293,11 +274,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
} }
func listDeadTasks(qname string, pageNum, pageSize int) { func listDeadTasks(qname string, pageNum, pageSize int) {
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
@ -317,12 +294,7 @@ func listDeadTasks(qname string, pageNum, pageSize int) {
} }
func taskCancel(cmd *cobra.Command, args []string) { func taskCancel(cmd *cobra.Command, args []string) {
r := rdb.NewRDB(redis.NewClient(&redis.Options{ r := createRDB()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
for _, id := range args { for _, id := range args {
err := r.PublishCancelation(id) err := r.PublishCancelation(id)
if err != nil { if err != nil {
@ -345,11 +317,7 @@ func taskKill(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err = i.KillTaskByKey(qname, key) err = i.KillTaskByKey(qname, key)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) fmt.Printf("error: %v\n", err)
@ -370,11 +338,7 @@ func taskDelete(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err = i.DeleteTaskByKey(qname, key) err = i.DeleteTaskByKey(qname, key)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) fmt.Printf("error: %v\n", err)
@ -395,11 +359,7 @@ func taskRun(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err = i.EnqueueTaskByKey(qname, key) err = i.EnqueueTaskByKey(qname, key)
if err != nil { if err != nil {
fmt.Printf("error: %v\n", err) fmt.Printf("error: %v\n", err)
@ -420,11 +380,7 @@ func taskKillAll(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var n int var n int
switch state { switch state {
case "scheduled": case "scheduled":
@ -454,11 +410,7 @@ func taskDeleteAll(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var n int var n int
switch state { switch state {
case "scheduled": case "scheduled":
@ -490,11 +442,7 @@ func taskRunAll(cmd *cobra.Command, args []string) {
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ i := createInspector()
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var n int var n int
switch state { switch state {
case "scheduled": case "scheduled":

View File

@ -3,7 +3,7 @@ module github.com/hibiken/asynq/tools
go 1.13 go 1.13
require ( require (
github.com/go-redis/redis/v7 v7.2.0 github.com/go-redis/redis/v7 v7.4.0
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/hibiken/asynq v0.4.0 github.com/hibiken/asynq v0.4.0
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0

View File

@ -28,6 +28,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs= github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs=
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4=
github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=