From ea23db4f6b853109bcb84e34d2c6b7fcbe5e9ae4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 12 Jan 2021 11:24:12 -0800 Subject: [PATCH] Update migrate command to move all dead tasks to the new archived zset --- tools/asynq/cmd/migrate.go | 27 +++++++++++++++++++++++++-- tools/asynq/cmd/task.go | 12 ++++++------ 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go index b6a4b1f..82f323a 100644 --- a/tools/asynq/cmd/migrate.go +++ b/tools/asynq/cmd/migrate.go @@ -98,7 +98,9 @@ func migrate(cmd *cobra.Command, args []string) { printError(err) os.Exit(1) } - if err := partitionZSetMembersByQueue(c, "asynq:dead", base.DeadKey); err != nil { + // Note: base.DeadKey function was renamed in v0.14. We define the legacy function here since we need it for this migration script. + deadKeyFunc := func(qname string) string { return fmt.Sprintf("asynq:{%s}:dead", qname) } + if err := partitionZSetMembersByQueue(c, "asynq:dead", deadKeyFunc); err != nil { printError(err) os.Exit(1) } @@ -113,7 +115,7 @@ func migrate(cmd *cobra.Command, args []string) { paused, err := c.SMembers("asynq:paused").Result() if err != nil { - printError(fmt.Errorf("command SMEMBERS asynq:paused failed: ", err)) + printError(fmt.Errorf("command SMEMBERS asynq:paused failed: %v", err)) os.Exit(1) } for _, qkey := range paused { @@ -136,6 +138,27 @@ func migrate(cmd *cobra.Command, args []string) { printError(err) os.Exit(1) } + + /*** Migrate from 0.13 to 0.14 compatible ***/ + + // Move all dead tasks to archived ZSET. + for _, qname := range allQueues { + zs, err := c.ZRangeWithScores(deadKeyFunc(qname), 0, -1).Result() + if err != nil { + printError(err) + os.Exit(1) + } + for _, z := range zs { + if err := c.ZAdd(base.ArchivedKey(qname), &z).Err(); err != nil { + printError(err) + os.Exit(1) + } + } + if err := deleteKey(c, deadKeyFunc(qname)); err != nil { + printError(err) + os.Exit(1) + } + } } func backupKey(key string) string { diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index d625a6c..4d466f6 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -125,7 +125,7 @@ var taskArchiveAllCmd = &cobra.Command{ Use: "archive-all --queue=QUEUE --state=STATE", Short: "Archive all tasks in the given state", Args: cobra.NoArgs, - Run: taskKillAll, + Run: taskArchiveAll, } var taskDeleteAllCmd = &cobra.Command{ @@ -275,7 +275,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) { func listArchivedTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) @@ -318,7 +318,7 @@ func taskKill(cmd *cobra.Command, args []string) { } i := createInspector() - err = i.KillTaskByKey(qname, key) + err = i.ArchiveTaskByKey(qname, key) if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) @@ -368,7 +368,7 @@ func taskRun(cmd *cobra.Command, args []string) { fmt.Println("task transitioned to pending state") } -func taskKillAll(cmd *cobra.Command, args []string) { +func taskArchiveAll(cmd *cobra.Command, args []string) { qname, err := cmd.Flags().GetString("queue") if err != nil { fmt.Printf("error: %v\n", err) @@ -384,9 +384,9 @@ func taskKillAll(cmd *cobra.Command, args []string) { var n int switch state { case "scheduled": - n, err = i.KillAllScheduledTasks(qname) + n, err = i.ArchiveAllScheduledTasks(qname) case "retry": - n, err = i.KillAllRetryTasks(qname) + n, err = i.ArchiveAllRetryTasks(qname) default: fmt.Printf("error: unsupported state %q\n", state) os.Exit(1)