diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 0d0dba3..689cd94 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -380,3 +380,18 @@ func (r *RDB) deleteTask(zset, id string, score float64) error { } return nil } + +// DeleteAllDeadTasks deletes all tasks from the dead queue. +func (r *RDB) DeleteAllDeadTasks() error { + return r.client.Del(deadQ).Err() +} + +// DeleteAllRetryTasks deletes all tasks from the dead queue. +func (r *RDB) DeleteAllRetryTasks() error { + return r.client.Del(retryQ).Err() +} + +// DeleteAllScheduledTasks deletes all tasks from the dead queue. +func (r *RDB) DeleteAllScheduledTasks() error { + return r.client.Del(scheduledQ).Err() +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 348951a..f698ba3 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1096,3 +1096,138 @@ func TestDeleteScheduledTask(t *testing.T) { } } } + +func TestDeleteAllDeadTasks(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + m3 := randomTask("gen_thumbnail", "default", nil) + + tests := []struct { + initDead []*TaskMessage + wantDead []*TaskMessage + }{ + { + initDead: []*TaskMessage{m1, m2, m3}, + wantDead: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize dead queue. + for _, msg := range tc.initDead { + err := r.client.ZAdd(deadQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.DeleteAllDeadTasks() + if err != nil { + t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) + } + + gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() + gotDead := mustUnmarshalSlice(t, gotDeadRaw) + if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", deadQ, diff) + } + } +} + +func TestDeleteAllRetryTasks(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + m3 := randomTask("gen_thumbnail", "default", nil) + + tests := []struct { + initRetry []*TaskMessage + wantRetry []*TaskMessage + }{ + { + initRetry: []*TaskMessage{m1, m2, m3}, + wantRetry: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize retry queue. + for _, msg := range tc.initRetry { + err := r.client.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.DeleteAllRetryTasks() + if err != nil { + t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) + } + + gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() + gotRetry := mustUnmarshalSlice(t, gotRetryRaw) + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + } + } +} + +func TestDeleteAllScheduledTasks(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + m3 := randomTask("gen_thumbnail", "default", nil) + + tests := []struct { + initScheduled []*TaskMessage + wantScheduled []*TaskMessage + }{ + { + initScheduled: []*TaskMessage{m1, m2, m3}, + wantScheduled: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize scheduled queue. + for _, msg := range tc.initScheduled { + err := r.client.ZAdd(scheduledQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Unix()), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.DeleteAllScheduledTasks() + if err != nil { + t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) + } + + gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() + gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + } + } +} diff --git a/tools/asynqmon/cmd/delall.go b/tools/asynqmon/cmd/delall.go new file mode 100644 index 0000000..5211167 --- /dev/null +++ b/tools/asynqmon/cmd/delall.go @@ -0,0 +1,65 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +var delallValidArgs = []string{"scheduled", "retry", "dead"} + +// delallCmd represents the delall command +var delallCmd = &cobra.Command{ + Use: "delall [queue name]", + Short: "Deletes all tasks from the specified queue", + Long: `Delall (asynqmon delall) will delete all tasks from the specified queue. + +The argument should be one of "scheduled", "retry", or "dead". + +Example: asynqmon delall dead -> Deletes all tasks from the dead queue`, + ValidArgs: delallValidArgs, + Args: cobra.ExactValidArgs(1), + Run: delall, +} + +func init() { + rootCmd.AddCommand(delallCmd) + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // delallCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // delallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +func delall(cmd *cobra.Command, args []string) { + c := redis.NewClient(&redis.Options{ + Addr: uri, + DB: db, + }) + r := rdb.NewRDB(c) + var err error + switch args[0] { + case "scheduled": + err = r.DeleteAllScheduledTasks() + case "retry": + err = r.DeleteAllRetryTasks() + case "dead": + err = r.DeleteAllDeadTasks() + default: + fmt.Printf("error: `asynqmon delall [queue name]` only accepts %v as the argument.\n", delallValidArgs) + os.Exit(1) + } + if err != nil { + fmt.Println(err) + os.Exit(1) + } + fmt.Printf("Deleted all tasks from %q queue\n", args[0]) +}