diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go index 3592d60..150dad0 100644 --- a/tools/asynq/cmd/migrate.go +++ b/tools/asynq/cmd/migrate.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "strings" + "time" "github.com/go-redis/redis/v7" "github.com/google/uuid" @@ -51,15 +52,27 @@ func renameKeyAsBackup(c redis.UniversalClient, key string) error { return c.Rename(key, backupKey(key)).Err() } +func failIfError(err error, msg string) { + if err != nil { + fmt.Printf("error: %s: %v\n", msg, err) + fmt.Println("*** Please report this issue at https://github.com/hibiken/asynq/issues ***") + os.Exit(1) + } +} + +func logIfError(err error, msg string) { + if err != nil { + fmt.Printf("warning: %s: %v\n", msg, err) + } +} + func migrate(cmd *cobra.Command, args []string) { r := createRDB() // Rename key asynq:{} -> asynq:{}:pending queues, err := r.AllQueues() - if err != nil { - fmt.Printf("(error): Failed to get queue names: %v", err) - os.Exit(1) - } + failIfError(err, "Failed to get queue names") + fmt.Print("Renaming pending keys...") for _, qname := range queues { oldKey := fmt.Sprintf("asynq:{%s}", qname) @@ -67,10 +80,8 @@ func migrate(cmd *cobra.Command, args []string) { continue } newKey := base.PendingKey(qname) - if err := r.Client().Rename(oldKey, newKey).Err(); err != nil { - fmt.Printf("(error): Failed to rename key: %v", err) - os.Exit(1) - } + err := r.Client().Rename(oldKey, newKey).Err() + failIfError(err, "Failed to rename key") } fmt.Print("Done\n") @@ -85,10 +96,8 @@ func migrate(cmd *cobra.Command, args []string) { base.ArchivedKey(qname), } for _, key := range keys { - if err := renameKeyAsBackup(r.Client(), key); err != nil { - fmt.Printf("(error): Failed to rename key %q for backup: %v", key, err) - os.Exit(1) - } + err := renameKeyAsBackup(r.Client(), key) + failIfError(err, fmt.Sprintf("Failed to rename key %q for backup", key)) } } fmt.Print("Done\n") @@ -103,61 +112,68 @@ func migrate(cmd *cobra.Command, args []string) { // Pending Tasks data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result() - if err != nil { - fmt.Printf("(error): Failed to read: %v", err) - os.Exit(1) - } + failIfError(err, "Failed to read backup pending key") + for _, s := range data { msg, err := UnmarshalOldMessage(s) - if err != nil { - fmt.Printf("(error): Failed to unmarshal message: %v", err) - os.Exit(1) - } + failIfError(err, "Failed to unmarshal message") + if msg.UniqueKey != "" { ttl, err := r.Client().TTL(msg.UniqueKey).Result() - if err != nil { - fmt.Printf("(error): Failed to get ttl: %v", err) - os.Exit(1) - } - if err := r.Client().Del(msg.UniqueKey).Err(); err != nil { - fmt.Printf("(error): Failed to delete unique key") - os.Exit(1) - } - // Regenerate unique key. - msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) - if err := r.EnqueueUnique(msg, ttl); err != nil { - fmt.Printf("(error): Failed to enqueue unique pending message: %v", err) - os.Exit(1) + failIfError(err, "Failed to get ttl") + + if ttl > 0 { + err = r.Client().Del(msg.UniqueKey).Err() + logIfError(err, "Failed to delete unique key") } - } else { - if err := r.Enqueue(msg); err != nil { - fmt.Printf("(error): Failed to enqueue pending message: %v", err) - os.Exit(1) + // Regenerate unique key. + msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) + if ttl > 0 { + err = r.EnqueueUnique(msg, ttl) + } else { + err = r.Enqueue(msg) } + failIfError(err, "Failed to enqueue pending message") + + } else { + err := r.Enqueue(msg) + failIfError(err, "Failed to enqueue pending message") } } // Scheduled Tasks - // data, err = r.Client().ZRangeWithScores(backupKey(base.ScheduledKey(qname)), 0, -1).Result() - // if err != nil { - // fmt.Printf("(error): Failed to read: %v", err) - // os.Exit(1) - // } - // for _, z := range data { - // msg, err := UnmarsalOldMessage(z.Member.(string)) - // if err != nil { - // fmt.Printf("(error): Failed to unmarshal message: %v", err) - // os.Exit(1) - // } - // task := asynq.NewTask(msg.Type, msg.Payload) - // opts := createOptions(msg, r.Client()) - // opts = append(opts, asynq.ProcessAt(time.Unix(z.Score, 0))) - // if _, err := c.Enqueue(task, opts...); err != nil { - // fmt.Printf("(error): Failed to enqueue task: %v", err) - // os.Exit(1) - // } - // } + zs, err := r.Client().ZRangeWithScores(backupKey(base.ScheduledKey(qname)), 0, -1).Result() + failIfError(err, "Failed to read") + + for _, z := range zs { + msg, err := UnmarshalOldMessage(z.Member.(string)) + failIfError(err, "Failed to unmarshal message") + + processAt := time.Unix(int64(z.Score), 0) + + if msg.UniqueKey != "" { + ttl, err := r.Client().TTL(msg.UniqueKey).Result() + failIfError(err, "Failed to get ttl") + + if ttl > 0 { + err = r.Client().Del(msg.UniqueKey).Err() + logIfError(err, "Failed to delete unique key") + } + + // Regenerate unique key. + msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) + if ttl > 0 { + err = r.ScheduleUnique(msg, processAt, ttl) + } else { + err = r.Schedule(msg, processAt) + } + failIfError(err, "Failed to enqueue pending message") + } else { + err := r.Schedule(msg, processAt) + failIfError(err, "Failed to enqueue scheduled message") + } + } // Retry Tasks @@ -176,10 +192,8 @@ func migrate(cmd *cobra.Command, args []string) { backupKey(base.ArchivedKey(qname)), } for _, key := range keys { - if err := r.Client().Del(key).Err(); err != nil { - fmt.Printf("(error): Failed to delete backup key: %v", err) - os.Exit(1) - } + err := r.Client().Del(key).Err() + failIfError(err, "Failed to delete backup key") } } fmt.Print("Done\n")