From 77604af265f0907b286f98f9426b068012b1b432 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 23 May 2021 11:19:29 -0700 Subject: [PATCH] Fix asynq CLI build --- tools/asynq/cmd/cron.go | 4 +- tools/asynq/cmd/migrate.go | 389 ------------------------------------- tools/asynq/cmd/queue.go | 16 +- tools/asynq/cmd/root.go | 5 +- tools/asynq/cmd/task.go | 79 ++++---- tools/go.mod | 5 +- tools/go.sum | 29 +++ 7 files changed, 83 insertions(+), 444 deletions(-) delete mode 100644 tools/asynq/cmd/migrate.go diff --git a/tools/asynq/cmd/cron.go b/tools/asynq/cmd/cron.go index 376d1cd..a7cffd1 100644 --- a/tools/asynq/cmd/cron.go +++ b/tools/asynq/cmd/cron.go @@ -11,7 +11,7 @@ import ( "sort" "time" - "github.com/hibiken/asynq/inspeq" + "github.com/hibiken/asynq" "github.com/spf13/cobra" ) @@ -108,7 +108,7 @@ func cronHistory(cmd *cobra.Command, args []string) { fmt.Printf("Entry: %s\n\n", entryID) events, err := inspector.ListSchedulerEnqueueEvents( - entryID, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + entryID, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Printf("error: %v\n", err) continue diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go deleted file mode 100644 index 82f323a..0000000 --- a/tools/asynq/cmd/migrate.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package cmd - -import ( - "encoding/json" - "fmt" - "os" - "strings" - "time" - - "github.com/go-redis/redis/v7" - "github.com/google/uuid" - "github.com/hibiken/asynq/internal/base" - "github.com/spf13/cast" - "github.com/spf13/cobra" - "github.com/spf13/viper" -) - -var migrateCmd = &cobra.Command{ - Use: "migrate", - Short: fmt.Sprintf("Migrate all tasks to be compatible with asynq v%s", base.Version), - Args: cobra.NoArgs, - Run: migrate, -} - -func init() { - rootCmd.AddCommand(migrateCmd) -} - -func migrate(cmd *cobra.Command, args []string) { - c := redis.NewClient(&redis.Options{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - }) - r := createRDB() - - /*** Migrate from 0.9 to 0.10, 0.11 compatible ***/ - lists := []string{"asynq:in_progress"} - allQueues, err := c.SMembers(base.AllQueues).Result() - if err != nil { - printError(fmt.Errorf("could not read all queues: %v", err)) - os.Exit(1) - } - lists = append(lists, allQueues...) - for _, key := range lists { - if err := migrateList(c, key); err != nil { - printError(err) - os.Exit(1) - } - } - - zsets := []string{"asynq:scheduled", "asynq:retry", "asynq:dead"} - for _, key := range zsets { - if err := migrateZSet(c, key); err != nil { - printError(err) - os.Exit(1) - } - } - - /*** Migrate from 0.11 to 0.12 compatible ***/ - if err := createBackup(c, base.AllQueues); err != nil { - printError(err) - os.Exit(1) - } - for _, qkey := range allQueues { - qname := strings.TrimPrefix(qkey, "asynq:queues:") - if err := c.SAdd(base.AllQueues, qname).Err(); err != nil { - err = fmt.Errorf("could not add queue name %q to %q set: %v\n", - qname, base.AllQueues, err) - printError(err) - os.Exit(1) - } - } - if err := deleteBackup(c, base.AllQueues); err != nil { - printError(err) - os.Exit(1) - } - - for _, qkey := range allQueues { - qname := strings.TrimPrefix(qkey, "asynq:queues:") - if exists := c.Exists(qkey).Val(); exists == 1 { - if err := c.Rename(qkey, base.QueueKey(qname)).Err(); err != nil { - printError(fmt.Errorf("could not rename key %q: %v\n", qkey, err)) - os.Exit(1) - } - } - } - - if err := partitionZSetMembersByQueue(c, "asynq:scheduled", base.ScheduledKey); err != nil { - printError(err) - os.Exit(1) - } - if err := partitionZSetMembersByQueue(c, "asynq:retry", base.RetryKey); err != nil { - printError(err) - os.Exit(1) - } - // Note: base.DeadKey function was renamed in v0.14. We define the legacy function here since we need it for this migration script. - deadKeyFunc := func(qname string) string { return fmt.Sprintf("asynq:{%s}:dead", qname) } - if err := partitionZSetMembersByQueue(c, "asynq:dead", deadKeyFunc); err != nil { - printError(err) - os.Exit(1) - } - if err := partitionZSetMembersByQueue(c, "asynq:deadlines", base.DeadlinesKey); err != nil { - printError(err) - os.Exit(1) - } - if err := partitionListMembersByQueue(c, "asynq:in_progress", base.ActiveKey); err != nil { - printError(err) - os.Exit(1) - } - - paused, err := c.SMembers("asynq:paused").Result() - if err != nil { - printError(fmt.Errorf("command SMEMBERS asynq:paused failed: %v", err)) - os.Exit(1) - } - for _, qkey := range paused { - qname := strings.TrimPrefix(qkey, "asynq:queues:") - if err := r.Pause(qname); err != nil { - printError(err) - os.Exit(1) - } - } - if err := deleteKey(c, "asynq:paused"); err != nil { - printError(err) - os.Exit(1) - } - - if err := deleteKey(c, "asynq:servers"); err != nil { - printError(err) - os.Exit(1) - } - if err := deleteKey(c, "asynq:workers"); err != nil { - printError(err) - os.Exit(1) - } - - /*** Migrate from 0.13 to 0.14 compatible ***/ - - // Move all dead tasks to archived ZSET. - for _, qname := range allQueues { - zs, err := c.ZRangeWithScores(deadKeyFunc(qname), 0, -1).Result() - if err != nil { - printError(err) - os.Exit(1) - } - for _, z := range zs { - if err := c.ZAdd(base.ArchivedKey(qname), &z).Err(); err != nil { - printError(err) - os.Exit(1) - } - } - if err := deleteKey(c, deadKeyFunc(qname)); err != nil { - printError(err) - os.Exit(1) - } - } -} - -func backupKey(key string) string { - return fmt.Sprintf("%s:backup", key) -} - -func createBackup(c *redis.Client, key string) error { - err := c.Rename(key, backupKey(key)).Err() - if err != nil { - return fmt.Errorf("could not rename key %q: %v", key, err) - } - return nil -} - -func deleteBackup(c *redis.Client, key string) error { - return deleteKey(c, backupKey(key)) -} - -func deleteKey(c *redis.Client, key string) error { - exists := c.Exists(key).Val() - if exists == 0 { - // key does not exist - return nil - } - err := c.Del(key).Err() - if err != nil { - return fmt.Errorf("could not delete key %q: %v", key, err) - } - return nil -} - -func printError(err error) { - fmt.Println(err) - fmt.Println() - fmt.Println("Migrate command error") - fmt.Println("Please file an issue on Github at https://github.com/hibiken/asynq/issues/new/choose") -} - -func partitionZSetMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error { - zs, err := c.ZRangeWithScores(key, 0, -1).Result() - if err != nil { - return fmt.Errorf("command ZRANGE %s 0 -1 WITHSCORES failed: %v", key, err) - } - for _, z := range zs { - s := cast.ToString(z.Member) - msg, err := base.DecodeMessage(s) - if err != nil { - return fmt.Errorf("could not decode message from %q: %v", key, err) - } - if err := c.ZAdd(newKeyFunc(msg.Queue), &z).Err(); err != nil { - return fmt.Errorf("could not add %v to %q: %v", z, newKeyFunc(msg.Queue)) - } - } - if err := deleteKey(c, key); err != nil { - return err - } - return nil -} - -func partitionListMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error { - data, err := c.LRange(key, 0, -1).Result() - if err != nil { - return fmt.Errorf("command LRANGE %s 0 -1 failed: %v", key, err) - } - for _, s := range data { - msg, err := base.DecodeMessage(s) - if err != nil { - return fmt.Errorf("could not decode message from %q: %v", key, err) - } - if err := c.LPush(newKeyFunc(msg.Queue), s).Err(); err != nil { - return fmt.Errorf("could not add %v to %q: %v", s, newKeyFunc(msg.Queue)) - } - } - if err := deleteKey(c, key); err != nil { - return err - } - return nil -} - -type oldTaskMessage struct { - // Unchanged - Type string - Payload map[string]interface{} - ID uuid.UUID - Queue string - Retry int - Retried int - ErrorMsg string - UniqueKey string - - // Following fields have changed. - - // Deadline specifies the deadline for the task. - // Task won't be processed if it exceeded its deadline. - // The string shoulbe be in RFC3339 format. - // - // time.Time's zero value means no deadline. - Timeout string - - // Deadline specifies the deadline for the task. - // Task won't be processed if it exceeded its deadline. - // The string shoulbe be in RFC3339 format. - // - // time.Time's zero value means no deadline. - Deadline string -} - -var defaultTimeout = 30 * time.Minute - -func convertMessage(old *oldTaskMessage) (*base.TaskMessage, error) { - timeout, err := time.ParseDuration(old.Timeout) - if err != nil { - return nil, fmt.Errorf("could not parse Timeout field of %+v", old) - } - deadline, err := time.Parse(time.RFC3339, old.Deadline) - if err != nil { - return nil, fmt.Errorf("could not parse Deadline field of %+v", old) - } - if timeout == 0 && deadline.IsZero() { - timeout = defaultTimeout - } - if deadline.IsZero() { - // Zero value used to be time.Time{}, - // in the new schema zero value is represented by - // zero in Unix time. - deadline = time.Unix(0, 0) - } - return &base.TaskMessage{ - Type: old.Type, - Payload: old.Payload, - ID: uuid.New(), - Queue: old.Queue, - Retry: old.Retry, - Retried: old.Retried, - ErrorMsg: old.ErrorMsg, - UniqueKey: old.UniqueKey, - Timeout: int64(timeout.Seconds()), - Deadline: deadline.Unix(), - }, nil -} - -func deserialize(s string) (*base.TaskMessage, error) { - // Try deserializing as old message. - d := json.NewDecoder(strings.NewReader(s)) - d.UseNumber() - var old *oldTaskMessage - if err := d.Decode(&old); err != nil { - // Try deserializing as new message. - d = json.NewDecoder(strings.NewReader(s)) - d.UseNumber() - var msg *base.TaskMessage - if err := d.Decode(&msg); err != nil { - return nil, fmt.Errorf("could not deserialize %s into task message: %v", s, err) - } - return msg, nil - } - return convertMessage(old) -} - -func migrateZSet(c *redis.Client, key string) error { - if c.Exists(key).Val() == 0 { - // skip if key doesn't exist. - return nil - } - res, err := c.ZRangeWithScores(key, 0, -1).Result() - if err != nil { - return err - } - var msgs []*redis.Z - for _, z := range res { - s, err := cast.ToStringE(z.Member) - if err != nil { - return fmt.Errorf("could not cast to string: %v", err) - } - msg, err := deserialize(s) - if err != nil { - return err - } - encoded, err := base.EncodeMessage(msg) - if err != nil { - return fmt.Errorf("could not encode message from %q: %v", key, err) - } - msgs = append(msgs, &redis.Z{Score: z.Score, Member: encoded}) - } - if err := c.Rename(key, key+":backup").Err(); err != nil { - return fmt.Errorf("could not rename key %q: %v", key, err) - } - if err := c.ZAdd(key, msgs...).Err(); err != nil { - return fmt.Errorf("could not write new messages to %q: %v", key, err) - } - if err := c.Del(key + ":backup").Err(); err != nil { - return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) - } - return nil -} - -func migrateList(c *redis.Client, key string) error { - if c.Exists(key).Val() == 0 { - // skip if key doesn't exist. - return nil - } - res, err := c.LRange(key, 0, -1).Result() - if err != nil { - return err - } - var msgs []interface{} - for _, s := range res { - msg, err := deserialize(s) - if err != nil { - return err - } - encoded, err := base.EncodeMessage(msg) - if err != nil { - return fmt.Errorf("could not encode message from %q: %v", key, err) - } - msgs = append(msgs, encoded) - } - if err := c.Rename(key, key+":backup").Err(); err != nil { - return fmt.Errorf("could not rename key %q: %v", key, err) - } - if err := c.LPush(key, msgs...).Err(); err != nil { - return fmt.Errorf("could not write new messages to %q: %v", key, err) - } - if err := c.Del(key + ":backup").Err(); err != nil { - return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) - } - return nil -} diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index 648e1f8..94ec637 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -10,8 +10,8 @@ import ( "os" "github.com/fatih/color" - "github.com/hibiken/asynq/inspeq" - "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq" + "github.com/hibiken/asynq/internal/errors" "github.com/spf13/cobra" ) @@ -82,7 +82,7 @@ func queueList(cmd *cobra.Command, args []string) { type queueInfo struct { name string keyslot int64 - nodes []inspeq.ClusterNode + nodes []*asynq.ClusterNode } inspector := createInspector() queues, err := inspector.Queues() @@ -132,16 +132,16 @@ func queueInspect(cmd *cobra.Command, args []string) { fmt.Printf("\n%s\n", separator) } fmt.Println() - stats, err := inspector.CurrentStats(qname) + info, err := inspector.GetQueueInfo(qname) if err != nil { fmt.Printf("error: %v\n", err) continue } - printQueueStats(stats) + printQueueInfo(info) } } -func printQueueStats(s *inspeq.QueueStats) { +func printQueueInfo(s *asynq.QueueInfo) { bold := color.New(color.Bold) bold.Println("Queue Info") fmt.Printf("Name: %s\n", s.Queue) @@ -191,7 +191,7 @@ func queueHistory(cmd *cobra.Command, args []string) { } } -func printDailyStats(stats []*inspeq.DailyStats) { +func printDailyStats(stats []*asynq.DailyStats) { printTable( []string{"date (UTC)", "processed", "failed", "error rate"}, func(w io.Writer, tmpl string) { @@ -244,7 +244,7 @@ func queueRemove(cmd *cobra.Command, args []string) { for _, qname := range args { err = r.RemoveQueue(qname, force) if err != nil { - if _, ok := err.(*rdb.ErrQueueNotEmpty); ok { + if errors.IsQueueNotEmpty(err) { fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname) continue } diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index 6e39c62..ddd9262 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -14,7 +14,6 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq" - "github.com/hibiken/asynq/inspeq" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" @@ -136,7 +135,7 @@ func createRDB() *rdb.RDB { } // createRDB creates a Inspector instance using flag values and returns it. -func createInspector() *inspeq.Inspector { +func createInspector() *asynq.Inspector { var connOpt asynq.RedisConnOpt if useRedisCluster { addrs := strings.Split(viper.GetString("cluster_addrs"), ",") @@ -153,7 +152,7 @@ func createInspector() *inspeq.Inspector { TLSConfig: getTLSConfig(), } } - return inspeq.New(connOpt) + return asynq.NewInspector(connOpt) } func getTLSConfig() *tls.Config { diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 9ff4d95..c59613f 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -10,7 +10,7 @@ import ( "os" "time" - "github.com/hibiken/asynq/inspeq" + "github.com/hibiken/asynq" "github.com/spf13/cobra" ) @@ -28,21 +28,21 @@ func init() { taskCmd.AddCommand(taskArchiveCmd) taskArchiveCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskArchiveCmd.Flags().StringP("key", "k", "", "key of the task") + taskArchiveCmd.Flags().StringP("id", "t", "", "id of the task") taskArchiveCmd.MarkFlagRequired("queue") - taskArchiveCmd.MarkFlagRequired("key") + taskArchiveCmd.MarkFlagRequired("id") taskCmd.AddCommand(taskDeleteCmd) taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskDeleteCmd.Flags().StringP("key", "k", "", "key of the task") + taskDeleteCmd.Flags().StringP("id", "t", "", "id of the task") taskDeleteCmd.MarkFlagRequired("queue") - taskDeleteCmd.MarkFlagRequired("key") + taskDeleteCmd.MarkFlagRequired("id") taskCmd.AddCommand(taskRunCmd) taskRunCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskRunCmd.Flags().StringP("key", "k", "", "key of the task") + taskRunCmd.Flags().StringP("id", "t", "", "id of the task") taskRunCmd.MarkFlagRequired("queue") - taskRunCmd.MarkFlagRequired("key") + taskRunCmd.MarkFlagRequired("id") taskCmd.AddCommand(taskArchiveAllCmd) taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") @@ -101,22 +101,22 @@ var taskCancelCmd = &cobra.Command{ } var taskArchiveCmd = &cobra.Command{ - Use: "archive --queue=QUEUE --key=KEY", - Short: "Archive a task with the given key", + Use: "archive --queue=QUEUE --id=TASK_ID", + Short: "Archive a task with the given id", Args: cobra.NoArgs, Run: taskArchive, } var taskDeleteCmd = &cobra.Command{ - Use: "delete --queue=QUEUE --key=KEY", - Short: "Delete a task with the given key", + Use: "delete --queue=QUEUE --id=TASK_ID", + Short: "Delete a task with the given id", Args: cobra.NoArgs, Run: taskDelete, } var taskRunCmd = &cobra.Command{ - Use: "run --queue=QUEUE --key=KEY", - Short: "Run a task with the given key", + Use: "run --queue=QUEUE --id=TASK_ID", + Short: "Run a task with the given id", Args: cobra.NoArgs, Run: taskRun, } @@ -129,14 +129,14 @@ var taskArchiveAllCmd = &cobra.Command{ } var taskDeleteAllCmd = &cobra.Command{ - Use: "delete-all --queue=QUEUE --key=KEY", + Use: "delete-all --queue=QUEUE --state=STATE", Short: "Delete all tasks in the given state", Args: cobra.NoArgs, Run: taskDeleteAll, } var taskRunAllCmd = &cobra.Command{ - Use: "run-all --queue=QUEUE --key=KEY", + Use: "run-all --queue=QUEUE --state=STATE", Short: "Run all tasks in the given state", Args: cobra.NoArgs, Run: taskRunAll, @@ -183,7 +183,7 @@ func taskList(cmd *cobra.Command, args []string) { func listActiveTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListActiveTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -196,7 +196,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) { []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) } }, ) @@ -204,7 +204,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) { func listPendingTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListPendingTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -214,10 +214,10 @@ func listPendingTasks(qname string, pageNum, pageSize int) { return } printTable( - []string{"Key", "Type", "Payload"}, + []string{"ID", "Type", "Payload"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload()) } }, ) @@ -225,7 +225,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) { func listScheduledTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListScheduledTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -235,12 +235,12 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { return } printTable( - []string{"Key", "Type", "Payload", "Process In"}, + []string{"ID", "Type", "Payload", "Process In"}, func(w io.Writer, tmpl string) { for _, t := range tasks { processIn := fmt.Sprintf("%.0f seconds", - t.NextProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn) + t.NextProcessAt().Sub(time.Now()).Seconds()) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), processIn) } }, ) @@ -248,7 +248,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) { func listRetryTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListRetryTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -258,16 +258,16 @@ func listRetryTasks(qname string, pageNum, pageSize int) { return } printTable( - []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, + []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"}, func(w io.Writer, tmpl string) { for _, t := range tasks { var nextRetry string - if d := t.NextProcessAt.Sub(time.Now()); d > 0 { + if d := t.NextProcessAt().Sub(time.Now()); d > 0 { nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) } else { nextRetry = "right now" } - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.LastError, t.Retried, t.MaxRetry) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), nextRetry, t.LastErr(), t.Retried(), t.MaxRetry()) } }, ) @@ -275,7 +275,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) { func listArchivedTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListArchivedTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum)) + tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -285,19 +285,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) { return } printTable( - []string{"Key", "Type", "Payload", "Last Failed", "Last Error"}, + []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}, func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.LastError) + fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), t.LastFailedAt(), t.LastErr()) } }) } func taskCancel(cmd *cobra.Command, args []string) { - r := createRDB() + i := createInspector() for _, id := range args { - err := r.PublishCancelation(id) - if err != nil { + if err := i.CancelProcessing(id); err != nil { fmt.Printf("error: could not send cancelation signal: %v\n", err) continue } @@ -311,14 +310,14 @@ func taskArchive(cmd *cobra.Command, args []string) { fmt.Printf("error: %v\n", err) os.Exit(1) } - key, err := cmd.Flags().GetString("key") + id, err := cmd.Flags().GetString("id") if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) } i := createInspector() - err = i.ArchiveTaskByKey(qname, key) + err = i.ArchiveTask(qname, id) if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) @@ -332,14 +331,14 @@ func taskDelete(cmd *cobra.Command, args []string) { fmt.Printf("error: %v\n", err) os.Exit(1) } - key, err := cmd.Flags().GetString("key") + id, err := cmd.Flags().GetString("id") if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) } i := createInspector() - err = i.DeleteTaskByKey(qname, key) + err = i.DeleteTask(qname, id) if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) @@ -353,14 +352,14 @@ func taskRun(cmd *cobra.Command, args []string) { fmt.Printf("error: %v\n", err) os.Exit(1) } - key, err := cmd.Flags().GetString("key") + id, err := cmd.Flags().GetString("id") if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) } i := createInspector() - err = i.RunTaskByKey(qname, key) + err = i.RunTask(qname, id) if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) diff --git a/tools/go.mod b/tools/go.mod index d033b45..166fd8d 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -8,8 +8,9 @@ require ( github.com/cpuguy83/go-md2man v1.0.10 // indirect github.com/fatih/color v1.9.0 github.com/go-redis/redis/v7 v7.4.0 - github.com/google/uuid v1.1.1 - github.com/hibiken/asynq v0.14.0 + github.com/golang/protobuf v1.4.1 // indirect + github.com/google/uuid v1.2.0 + github.com/hibiken/asynq v0.17.1 github.com/mitchellh/go-homedir v1.1.0 github.com/spf13/cast v1.3.1 github.com/spf13/cobra v1.1.1 diff --git a/tools/go.sum b/tools/go.sum index 506cbe6..24afe15 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -25,6 +25,7 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -42,6 +43,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= @@ -68,18 +71,29 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -172,6 +186,7 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -322,6 +337,7 @@ golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= @@ -350,10 +366,22 @@ google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -378,5 +406,6 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=