From 0d74c518bfe928ad076c72c64f1b37d54619bcba Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 10 Dec 2019 20:28:31 -0800 Subject: [PATCH 1/3] Add methods to rdb to enqueue all tasks from dead, retry and scheduled queue --- asynq.go | 7 +- internal/rdb/inspect.go | 31 +++++++ internal/rdb/inspect_test.go | 153 +++++++++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 1 deletion(-) diff --git a/asynq.go b/asynq.go index bed372e..30af831 100644 --- a/asynq.go +++ b/asynq.go @@ -5,13 +5,18 @@ import "github.com/go-redis/redis/v7" /* TODOs: - [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue +- [P0] asynqmon del , asynqmon delall +- [P0] asynqmon kill , asynqmon killall +- [P0] Redis Memory Usage, Connection info in stats +- [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template +- [P0] Redis Sentinel support - [P1] Add Support for multiple queues and priority - [P1] User defined max-retry count */ // Max retry count by default -const defaultMaxRetry = 25 +const defaultMaxRetry = 1 // Task represents a task to be performed. type Task struct { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 38984be..79e3674 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -272,6 +272,21 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { return nil } +// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue. +func (r *RDB) EnqueueAllScheduledTasks() error { + return r.removeAndEnqueueAll(scheduledQ) +} + +// EnqueueAllRetryTasks enqueues all tasks from retry queue. +func (r *RDB) EnqueueAllRetryTasks() error { + return r.removeAndEnqueueAll(retryQ) +} + +// EnqueueAllDeadTasks enqueues all tasks from dead queue. +func (r *RDB) EnqueueAllDeadTasks() error { + return r.removeAndEnqueueAll(deadQ) +} + func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) @@ -295,3 +310,19 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { } return n, nil } + +func (r *RDB) removeAndEnqueueAll(zset string) error { + script := redis.NewScript(` + local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) + for _, msg in ipairs(msgs) do + redis.call("ZREM", KEYS[1], msg) + redis.call("LPUSH", KEYS[2], msg) + end + return table.getn(msgs) + `) + _, err := script.Run(r.client, []string{zset, defaultQ}).Result() + if err != nil { + return err + } + return nil +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 33b6a39..eddf6ac 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -698,3 +698,156 @@ func TestEnqueueScheduledTask(t *testing.T) { } } } + +func TestEnqueueAllScheduledTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + scheduled []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in scheduled queue", + scheduled: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty scheduled queue", + scheduled: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize scheduled queue + for _, msg := range tc.scheduled { + err := r.client.ZAdd(scheduledQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllScheduledTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +} + +func TestEnqueueAllRetryTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + retry []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in retry queue", + retry: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty retry queue", + retry: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize retry queue + for _, msg := range tc.retry { + err := r.client.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllRetryTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllRetryTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +} + +func TestEnqueueAllDeadTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + dead []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in dead queue", + dead: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty dead queue", + dead: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize dead queue + for _, msg := range tc.dead { + err := r.client.ZAdd(deadQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllDeadTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllDeadTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +} From a96719413c54f433472e18cb1bf4a6fef059a182 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 10 Dec 2019 21:38:25 -0800 Subject: [PATCH 2/3] Add enqall command to asynqmon CLI --- asynq.go | 2 +- internal/rdb/inspect.go | 23 +++++++----- internal/rdb/inspect_test.go | 42 ++++++++++++++++++---- tools/asynqmon/cmd/enq.go | 4 +-- tools/asynqmon/cmd/enqall.go | 69 ++++++++++++++++++++++++++++++++++++ tools/asynqmon/cmd/ls.go | 12 +++---- tools/asynqmon/cmd/root.go | 1 + tools/asynqmon/cmd/stats.go | 2 +- 8 files changed, 131 insertions(+), 24 deletions(-) create mode 100644 tools/asynqmon/cmd/enqall.go diff --git a/asynq.go b/asynq.go index 30af831..054c38e 100644 --- a/asynq.go +++ b/asynq.go @@ -16,7 +16,7 @@ TODOs: */ // Max retry count by default -const defaultMaxRetry = 1 +const defaultMaxRetry = 25 // Task represents a task to be performed. type Task struct { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 79e3674..a702554 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -273,17 +273,20 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { } // EnqueueAllScheduledTasks enqueues all tasks from scheduled queue. -func (r *RDB) EnqueueAllScheduledTasks() error { +// and returns the number of tasks enqueued. +func (r *RDB) EnqueueAllScheduledTasks() (int64, error) { return r.removeAndEnqueueAll(scheduledQ) } // EnqueueAllRetryTasks enqueues all tasks from retry queue. -func (r *RDB) EnqueueAllRetryTasks() error { +// and returns the number of tasks enqueued. +func (r *RDB) EnqueueAllRetryTasks() (int64, error) { return r.removeAndEnqueueAll(retryQ) } -// EnqueueAllDeadTasks enqueues all tasks from dead queue. -func (r *RDB) EnqueueAllDeadTasks() error { +// EnqueueAllDeadTasks enqueues all tasks from dead queue +// and returns the number of tasks enqueued. +func (r *RDB) EnqueueAllDeadTasks() (int64, error) { return r.removeAndEnqueueAll(deadQ) } @@ -311,7 +314,7 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { return n, nil } -func (r *RDB) removeAndEnqueueAll(zset string) error { +func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { script := redis.NewScript(` local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) for _, msg in ipairs(msgs) do @@ -320,9 +323,13 @@ func (r *RDB) removeAndEnqueueAll(zset string) error { end return table.getn(msgs) `) - _, err := script.Run(r.client, []string{zset, defaultQ}).Result() + res, err := script.Run(r.client, []string{zset, defaultQ}).Result() if err != nil { - return err + return 0, err } - return nil + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index eddf6ac..42239b4 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -708,16 +708,19 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { tests := []struct { description string scheduled []*TaskMessage + want int64 wantEnqueued []*TaskMessage }{ { description: "with tasks in scheduled queue", scheduled: []*TaskMessage{t1, t2, t3}, + want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { description: "with empty scheduled queue", scheduled: []*TaskMessage{}, + want: 0, wantEnqueued: []*TaskMessage{}, }, } @@ -737,9 +740,16 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { } } - err := r.EnqueueAllScheduledTasks() + got, err := r.EnqueueAllScheduledTasks() if err != nil { - t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, want nil", tc.description, err) + t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) + continue + } + + if got != tc.want { + t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) } gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() @@ -759,16 +769,19 @@ func TestEnqueueAllRetryTasks(t *testing.T) { tests := []struct { description string retry []*TaskMessage + want int64 wantEnqueued []*TaskMessage }{ { description: "with tasks in retry queue", retry: []*TaskMessage{t1, t2, t3}, + want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { description: "with empty retry queue", retry: []*TaskMessage{}, + want: 0, wantEnqueued: []*TaskMessage{}, }, } @@ -788,9 +801,16 @@ func TestEnqueueAllRetryTasks(t *testing.T) { } } - err := r.EnqueueAllRetryTasks() + got, err := r.EnqueueAllRetryTasks() if err != nil { - t.Errorf("%s; r.EnqueueAllRetryTasks = %v, want nil", tc.description, err) + t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) + continue + } + + if got != tc.want { + t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) } gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() @@ -810,16 +830,19 @@ func TestEnqueueAllDeadTasks(t *testing.T) { tests := []struct { description string dead []*TaskMessage + want int64 wantEnqueued []*TaskMessage }{ { description: "with tasks in dead queue", dead: []*TaskMessage{t1, t2, t3}, + want: 3, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { description: "with empty dead queue", dead: []*TaskMessage{}, + want: 0, wantEnqueued: []*TaskMessage{}, }, } @@ -839,9 +862,16 @@ func TestEnqueueAllDeadTasks(t *testing.T) { } } - err := r.EnqueueAllDeadTasks() + got, err := r.EnqueueAllDeadTasks() if err != nil { - t.Errorf("%s; r.EnqueueAllDeadTasks = %v, want nil", tc.description, err) + t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) + continue + } + + if got != tc.want { + t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", + tc.description, got, err, tc.want) } gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() diff --git a/tools/asynqmon/cmd/enq.go b/tools/asynqmon/cmd/enq.go index 9e81fbe..431b2a8 100644 --- a/tools/asynqmon/cmd/enq.go +++ b/tools/asynqmon/cmd/enq.go @@ -11,9 +11,9 @@ import ( // enqCmd represents the enq command var enqCmd = &cobra.Command{ - Use: "enq", + Use: "enq [task id]", Short: "Enqueues a task given an identifier", - Long: `The enq command enqueues a task given an identifier. + Long: `Enq (asynqmon enq) will enqueue a task given an identifier. The command takes one argument which specifies the task to enqueue. The task should be in either scheduled, retry or dead queue. diff --git a/tools/asynqmon/cmd/enqall.go b/tools/asynqmon/cmd/enqall.go new file mode 100644 index 0000000..0293366 --- /dev/null +++ b/tools/asynqmon/cmd/enqall.go @@ -0,0 +1,69 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +var enqallValidArgs = []string{"scheduled", "retry", "dead"} + +// enqallCmd represents the enqall command +var enqallCmd = &cobra.Command{ + Use: "enqall [queue name]", + Short: "Enqueues all tasks from the specified queue", + Long: `Enqall (asynqmon enqall) will enqueue all tasks from the specified queue. + +The argument should be one of "scheduled", "retry", or "dead". + +The tasks enqueued by this command will be processed as soon as it +gets dequeued by a processor. + +Example: asynqmon enqall dead -> Enqueues all tasks from the dead queue`, + ValidArgs: enqallValidArgs, + Args: cobra.ExactValidArgs(1), + Run: enqall, +} + +func init() { + rootCmd.AddCommand(enqallCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // enqallCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // enqallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func enqall(cmd *cobra.Command, args []string) { + c := redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + }) + r := rdb.NewRDB(c) + var n int64 + var err error + switch args[0] { + case "scheduled": + n, err = r.EnqueueAllScheduledTasks() + case "retry": + n, err = r.EnqueueAllRetryTasks() + case "dead": + n, err = r.EnqueueAllDeadTasks() + default: + fmt.Printf("error: `asynqmon enqall ` only accepts %v as the argument.\n", enqallValidArgs) + os.Exit(1) + } + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Printf("Enqueued %d tasks from %q queue\n", n, args[0]) +} diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index 9a456f6..49aaf93 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -15,20 +15,20 @@ import ( "github.com/spf13/cobra" ) -var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} +var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} // lsCmd represents the ls command var lsCmd = &cobra.Command{ - Use: "ls", + Use: "ls [queue name]", Short: "Lists queue contents", - Long: `The ls command lists all tasks from the specified queue in a table format. + Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format. The command takes one argument which specifies the queue to inspect. The value of the argument should be one of "enqueued", "inprogress", "scheduled", "retry", or "dead". Example: asynqmon ls dead`, - ValidArgs: validArgs, + ValidArgs: lsValidArgs, Args: cobra.ExactValidArgs(1), Run: ls, } @@ -65,8 +65,8 @@ func ls(cmd *cobra.Command, args []string) { case "dead": listDead(r) default: - fmt.Printf("error: `asynqmon ls ` only accepts %v as the argument.\n", validArgs) - return + fmt.Printf("error: `asynqmon ls ` only accepts %v as the argument.\n", lsValidArgs) + os.Exit(1) } } diff --git a/tools/asynqmon/cmd/root.go b/tools/asynqmon/cmd/root.go index 596ddd0..5bf4df4 100644 --- a/tools/asynqmon/cmd/root.go +++ b/tools/asynqmon/cmd/root.go @@ -50,6 +50,7 @@ func init() { } // initConfig reads in config file and ENV variables if set. +// TODO(hibiken): Remove this if not necessary. func initConfig() { if cfgFile != "" { // Use config file from the flag. diff --git a/tools/asynqmon/cmd/stats.go b/tools/asynqmon/cmd/stats.go index 59d49c5..5232620 100644 --- a/tools/asynqmon/cmd/stats.go +++ b/tools/asynqmon/cmd/stats.go @@ -15,7 +15,7 @@ import ( var statsCmd = &cobra.Command{ Use: "stats", Short: "Shows current state of the queues", - Long: `The stats command shows the number of tasks in each queue at that instant. + Long: `Stats (aysnqmon stats) will show the number of tasks in each queue at that instant. To monitor the queues continuously, it's recommended that you run this command in conjunction with the watch command. From ef562e2efabb8047b72e81880d51f0726ca4b3b1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 10 Dec 2019 21:48:19 -0800 Subject: [PATCH 3/3] Minor improvement --- internal/rdb/inspect.go | 4 ++-- tools/asynqmon/cmd/enqall.go | 2 +- tools/asynqmon/cmd/ls.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a702554..872cc59 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -272,13 +272,13 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { return nil } -// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue. +// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue // and returns the number of tasks enqueued. func (r *RDB) EnqueueAllScheduledTasks() (int64, error) { return r.removeAndEnqueueAll(scheduledQ) } -// EnqueueAllRetryTasks enqueues all tasks from retry queue. +// EnqueueAllRetryTasks enqueues all tasks from retry queue // and returns the number of tasks enqueued. func (r *RDB) EnqueueAllRetryTasks() (int64, error) { return r.removeAndEnqueueAll(retryQ) diff --git a/tools/asynqmon/cmd/enqall.go b/tools/asynqmon/cmd/enqall.go index 0293366..d60bef4 100644 --- a/tools/asynqmon/cmd/enqall.go +++ b/tools/asynqmon/cmd/enqall.go @@ -58,7 +58,7 @@ func enqall(cmd *cobra.Command, args []string) { case "dead": n, err = r.EnqueueAllDeadTasks() default: - fmt.Printf("error: `asynqmon enqall ` only accepts %v as the argument.\n", enqallValidArgs) + fmt.Printf("error: `asynqmon enqall [queue name]` only accepts %v as the argument.\n", enqallValidArgs) os.Exit(1) } if err != nil { diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index 49aaf93..bd94f50 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -65,7 +65,7 @@ func ls(cmd *cobra.Command, args []string) { case "dead": listDead(r) default: - fmt.Printf("error: `asynqmon ls ` only accepts %v as the argument.\n", lsValidArgs) + fmt.Printf("error: `asynqmon ls [queue name]` only accepts %v as the argument.\n", lsValidArgs) os.Exit(1) } }