From a9aa48055131237e73dbd537d8db5286dc82eccf Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 10 Sep 2020 06:53:07 -0700 Subject: [PATCH] Update migrate command --- tools/asynq/cmd/migrate.go | 366 +++++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 tools/asynq/cmd/migrate.go diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go new file mode 100644 index 0000000..b6a4b1f --- /dev/null +++ b/tools/asynq/cmd/migrate.go @@ -0,0 +1,366 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/uuid" + "github.com/hibiken/asynq/internal/base" + "github.com/spf13/cast" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var migrateCmd = &cobra.Command{ + Use: "migrate", + Short: fmt.Sprintf("Migrate all tasks to be compatible with asynq v%s", base.Version), + Args: cobra.NoArgs, + Run: migrate, +} + +func init() { + rootCmd.AddCommand(migrateCmd) +} + +func migrate(cmd *cobra.Command, args []string) { + c := redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + r := createRDB() + + /*** Migrate from 0.9 to 0.10, 0.11 compatible ***/ + lists := []string{"asynq:in_progress"} + allQueues, err := c.SMembers(base.AllQueues).Result() + if err != nil { + printError(fmt.Errorf("could not read all queues: %v", err)) + os.Exit(1) + } + lists = append(lists, allQueues...) + for _, key := range lists { + if err := migrateList(c, key); err != nil { + printError(err) + os.Exit(1) + } + } + + zsets := []string{"asynq:scheduled", "asynq:retry", "asynq:dead"} + for _, key := range zsets { + if err := migrateZSet(c, key); err != nil { + printError(err) + os.Exit(1) + } + } + + /*** Migrate from 0.11 to 0.12 compatible ***/ + if err := createBackup(c, base.AllQueues); err != nil { + printError(err) + os.Exit(1) + } + for _, qkey := range allQueues { + qname := strings.TrimPrefix(qkey, "asynq:queues:") + if err := c.SAdd(base.AllQueues, qname).Err(); err != nil { + err = fmt.Errorf("could not add queue name %q to %q set: %v\n", + qname, base.AllQueues, err) + printError(err) + os.Exit(1) + } + } + if err := deleteBackup(c, base.AllQueues); err != nil { + printError(err) + os.Exit(1) + } + + for _, qkey := range allQueues { + qname := strings.TrimPrefix(qkey, "asynq:queues:") + if exists := c.Exists(qkey).Val(); exists == 1 { + if err := c.Rename(qkey, base.QueueKey(qname)).Err(); err != nil { + printError(fmt.Errorf("could not rename key %q: %v\n", qkey, err)) + os.Exit(1) + } + } + } + + if err := partitionZSetMembersByQueue(c, "asynq:scheduled", base.ScheduledKey); err != nil { + printError(err) + os.Exit(1) + } + if err := partitionZSetMembersByQueue(c, "asynq:retry", base.RetryKey); err != nil { + printError(err) + os.Exit(1) + } + if err := partitionZSetMembersByQueue(c, "asynq:dead", base.DeadKey); err != nil { + printError(err) + os.Exit(1) + } + if err := partitionZSetMembersByQueue(c, "asynq:deadlines", base.DeadlinesKey); err != nil { + printError(err) + os.Exit(1) + } + if err := partitionListMembersByQueue(c, "asynq:in_progress", base.ActiveKey); err != nil { + printError(err) + os.Exit(1) + } + + paused, err := c.SMembers("asynq:paused").Result() + if err != nil { + printError(fmt.Errorf("command SMEMBERS asynq:paused failed: ", err)) + os.Exit(1) + } + for _, qkey := range paused { + qname := strings.TrimPrefix(qkey, "asynq:queues:") + if err := r.Pause(qname); err != nil { + printError(err) + os.Exit(1) + } + } + if err := deleteKey(c, "asynq:paused"); err != nil { + printError(err) + os.Exit(1) + } + + if err := deleteKey(c, "asynq:servers"); err != nil { + printError(err) + os.Exit(1) + } + if err := deleteKey(c, "asynq:workers"); err != nil { + printError(err) + os.Exit(1) + } +} + +func backupKey(key string) string { + return fmt.Sprintf("%s:backup", key) +} + +func createBackup(c *redis.Client, key string) error { + err := c.Rename(key, backupKey(key)).Err() + if err != nil { + return fmt.Errorf("could not rename key %q: %v", key, err) + } + return nil +} + +func deleteBackup(c *redis.Client, key string) error { + return deleteKey(c, backupKey(key)) +} + +func deleteKey(c *redis.Client, key string) error { + exists := c.Exists(key).Val() + if exists == 0 { + // key does not exist + return nil + } + err := c.Del(key).Err() + if err != nil { + return fmt.Errorf("could not delete key %q: %v", key, err) + } + return nil +} + +func printError(err error) { + fmt.Println(err) + fmt.Println() + fmt.Println("Migrate command error") + fmt.Println("Please file an issue on Github at https://github.com/hibiken/asynq/issues/new/choose") +} + +func partitionZSetMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error { + zs, err := c.ZRangeWithScores(key, 0, -1).Result() + if err != nil { + return fmt.Errorf("command ZRANGE %s 0 -1 WITHSCORES failed: %v", key, err) + } + for _, z := range zs { + s := cast.ToString(z.Member) + msg, err := base.DecodeMessage(s) + if err != nil { + return fmt.Errorf("could not decode message from %q: %v", key, err) + } + if err := c.ZAdd(newKeyFunc(msg.Queue), &z).Err(); err != nil { + return fmt.Errorf("could not add %v to %q: %v", z, newKeyFunc(msg.Queue)) + } + } + if err := deleteKey(c, key); err != nil { + return err + } + return nil +} + +func partitionListMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error { + data, err := c.LRange(key, 0, -1).Result() + if err != nil { + return fmt.Errorf("command LRANGE %s 0 -1 failed: %v", key, err) + } + for _, s := range data { + msg, err := base.DecodeMessage(s) + if err != nil { + return fmt.Errorf("could not decode message from %q: %v", key, err) + } + if err := c.LPush(newKeyFunc(msg.Queue), s).Err(); err != nil { + return fmt.Errorf("could not add %v to %q: %v", s, newKeyFunc(msg.Queue)) + } + } + if err := deleteKey(c, key); err != nil { + return err + } + return nil +} + +type oldTaskMessage struct { + // Unchanged + Type string + Payload map[string]interface{} + ID uuid.UUID + Queue string + Retry int + Retried int + ErrorMsg string + UniqueKey string + + // Following fields have changed. + + // Deadline specifies the deadline for the task. + // Task won't be processed if it exceeded its deadline. + // The string shoulbe be in RFC3339 format. + // + // time.Time's zero value means no deadline. + Timeout string + + // Deadline specifies the deadline for the task. + // Task won't be processed if it exceeded its deadline. + // The string shoulbe be in RFC3339 format. + // + // time.Time's zero value means no deadline. + Deadline string +} + +var defaultTimeout = 30 * time.Minute + +func convertMessage(old *oldTaskMessage) (*base.TaskMessage, error) { + timeout, err := time.ParseDuration(old.Timeout) + if err != nil { + return nil, fmt.Errorf("could not parse Timeout field of %+v", old) + } + deadline, err := time.Parse(time.RFC3339, old.Deadline) + if err != nil { + return nil, fmt.Errorf("could not parse Deadline field of %+v", old) + } + if timeout == 0 && deadline.IsZero() { + timeout = defaultTimeout + } + if deadline.IsZero() { + // Zero value used to be time.Time{}, + // in the new schema zero value is represented by + // zero in Unix time. + deadline = time.Unix(0, 0) + } + return &base.TaskMessage{ + Type: old.Type, + Payload: old.Payload, + ID: uuid.New(), + Queue: old.Queue, + Retry: old.Retry, + Retried: old.Retried, + ErrorMsg: old.ErrorMsg, + UniqueKey: old.UniqueKey, + Timeout: int64(timeout.Seconds()), + Deadline: deadline.Unix(), + }, nil +} + +func deserialize(s string) (*base.TaskMessage, error) { + // Try deserializing as old message. + d := json.NewDecoder(strings.NewReader(s)) + d.UseNumber() + var old *oldTaskMessage + if err := d.Decode(&old); err != nil { + // Try deserializing as new message. + d = json.NewDecoder(strings.NewReader(s)) + d.UseNumber() + var msg *base.TaskMessage + if err := d.Decode(&msg); err != nil { + return nil, fmt.Errorf("could not deserialize %s into task message: %v", s, err) + } + return msg, nil + } + return convertMessage(old) +} + +func migrateZSet(c *redis.Client, key string) error { + if c.Exists(key).Val() == 0 { + // skip if key doesn't exist. + return nil + } + res, err := c.ZRangeWithScores(key, 0, -1).Result() + if err != nil { + return err + } + var msgs []*redis.Z + for _, z := range res { + s, err := cast.ToStringE(z.Member) + if err != nil { + return fmt.Errorf("could not cast to string: %v", err) + } + msg, err := deserialize(s) + if err != nil { + return err + } + encoded, err := base.EncodeMessage(msg) + if err != nil { + return fmt.Errorf("could not encode message from %q: %v", key, err) + } + msgs = append(msgs, &redis.Z{Score: z.Score, Member: encoded}) + } + if err := c.Rename(key, key+":backup").Err(); err != nil { + return fmt.Errorf("could not rename key %q: %v", key, err) + } + if err := c.ZAdd(key, msgs...).Err(); err != nil { + return fmt.Errorf("could not write new messages to %q: %v", key, err) + } + if err := c.Del(key + ":backup").Err(); err != nil { + return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) + } + return nil +} + +func migrateList(c *redis.Client, key string) error { + if c.Exists(key).Val() == 0 { + // skip if key doesn't exist. + return nil + } + res, err := c.LRange(key, 0, -1).Result() + if err != nil { + return err + } + var msgs []interface{} + for _, s := range res { + msg, err := deserialize(s) + if err != nil { + return err + } + encoded, err := base.EncodeMessage(msg) + if err != nil { + return fmt.Errorf("could not encode message from %q: %v", key, err) + } + msgs = append(msgs, encoded) + } + if err := c.Rename(key, key+":backup").Err(); err != nil { + return fmt.Errorf("could not rename key %q: %v", key, err) + } + if err := c.LPush(key, msgs...).Err(); err != nil { + return fmt.Errorf("could not write new messages to %q: %v", key, err) + } + if err := c.Del(key + ":backup").Err(); err != nil { + return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err) + } + return nil +}