2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Migrate scheduled task

This commit is contained in:
Ken Hibino 2021-06-15 19:48:14 -07:00
parent ccbb2ad5e9
commit 8fe71acfde

View File

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/uuid" "github.com/google/uuid"
@ -51,15 +52,27 @@ func renameKeyAsBackup(c redis.UniversalClient, key string) error {
return c.Rename(key, backupKey(key)).Err() return c.Rename(key, backupKey(key)).Err()
} }
func failIfError(err error, msg string) {
if err != nil {
fmt.Printf("error: %s: %v\n", msg, err)
fmt.Println("*** Please report this issue at https://github.com/hibiken/asynq/issues ***")
os.Exit(1)
}
}
func logIfError(err error, msg string) {
if err != nil {
fmt.Printf("warning: %s: %v\n", msg, err)
}
}
func migrate(cmd *cobra.Command, args []string) { func migrate(cmd *cobra.Command, args []string) {
r := createRDB() r := createRDB()
// Rename key asynq:{<qname>} -> asynq:{<qname>}:pending // Rename key asynq:{<qname>} -> asynq:{<qname>}:pending
queues, err := r.AllQueues() queues, err := r.AllQueues()
if err != nil { failIfError(err, "Failed to get queue names")
fmt.Printf("(error): Failed to get queue names: %v", err)
os.Exit(1)
}
fmt.Print("Renaming pending keys...") fmt.Print("Renaming pending keys...")
for _, qname := range queues { for _, qname := range queues {
oldKey := fmt.Sprintf("asynq:{%s}", qname) oldKey := fmt.Sprintf("asynq:{%s}", qname)
@ -67,10 +80,8 @@ func migrate(cmd *cobra.Command, args []string) {
continue continue
} }
newKey := base.PendingKey(qname) newKey := base.PendingKey(qname)
if err := r.Client().Rename(oldKey, newKey).Err(); err != nil { err := r.Client().Rename(oldKey, newKey).Err()
fmt.Printf("(error): Failed to rename key: %v", err) failIfError(err, "Failed to rename key")
os.Exit(1)
}
} }
fmt.Print("Done\n") fmt.Print("Done\n")
@ -85,10 +96,8 @@ func migrate(cmd *cobra.Command, args []string) {
base.ArchivedKey(qname), base.ArchivedKey(qname),
} }
for _, key := range keys { for _, key := range keys {
if err := renameKeyAsBackup(r.Client(), key); err != nil { err := renameKeyAsBackup(r.Client(), key)
fmt.Printf("(error): Failed to rename key %q for backup: %v", key, err) failIfError(err, fmt.Sprintf("Failed to rename key %q for backup", key))
os.Exit(1)
}
} }
} }
fmt.Print("Done\n") fmt.Print("Done\n")
@ -103,61 +112,68 @@ func migrate(cmd *cobra.Command, args []string) {
// Pending Tasks // Pending Tasks
data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result() data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result()
if err != nil { failIfError(err, "Failed to read backup pending key")
fmt.Printf("(error): Failed to read: %v", err)
os.Exit(1)
}
for _, s := range data { for _, s := range data {
msg, err := UnmarshalOldMessage(s) msg, err := UnmarshalOldMessage(s)
if err != nil { failIfError(err, "Failed to unmarshal message")
fmt.Printf("(error): Failed to unmarshal message: %v", err)
os.Exit(1)
}
if msg.UniqueKey != "" { if msg.UniqueKey != "" {
ttl, err := r.Client().TTL(msg.UniqueKey).Result() ttl, err := r.Client().TTL(msg.UniqueKey).Result()
if err != nil { failIfError(err, "Failed to get ttl")
fmt.Printf("(error): Failed to get ttl: %v", err)
os.Exit(1) if ttl > 0 {
} err = r.Client().Del(msg.UniqueKey).Err()
if err := r.Client().Del(msg.UniqueKey).Err(); err != nil { logIfError(err, "Failed to delete unique key")
fmt.Printf("(error): Failed to delete unique key")
os.Exit(1)
}
// Regenerate unique key.
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
if err := r.EnqueueUnique(msg, ttl); err != nil {
fmt.Printf("(error): Failed to enqueue unique pending message: %v", err)
os.Exit(1)
} }
} else { // Regenerate unique key.
if err := r.Enqueue(msg); err != nil { msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
fmt.Printf("(error): Failed to enqueue pending message: %v", err) if ttl > 0 {
os.Exit(1) err = r.EnqueueUnique(msg, ttl)
} else {
err = r.Enqueue(msg)
} }
failIfError(err, "Failed to enqueue pending message")
} else {
err := r.Enqueue(msg)
failIfError(err, "Failed to enqueue pending message")
} }
} }
// Scheduled Tasks // Scheduled Tasks
// data, err = r.Client().ZRangeWithScores(backupKey(base.ScheduledKey(qname)), 0, -1).Result() zs, err := r.Client().ZRangeWithScores(backupKey(base.ScheduledKey(qname)), 0, -1).Result()
// if err != nil { failIfError(err, "Failed to read")
// fmt.Printf("(error): Failed to read: %v", err)
// os.Exit(1) for _, z := range zs {
// } msg, err := UnmarshalOldMessage(z.Member.(string))
// for _, z := range data { failIfError(err, "Failed to unmarshal message")
// msg, err := UnmarsalOldMessage(z.Member.(string))
// if err != nil { processAt := time.Unix(int64(z.Score), 0)
// fmt.Printf("(error): Failed to unmarshal message: %v", err)
// os.Exit(1) if msg.UniqueKey != "" {
// } ttl, err := r.Client().TTL(msg.UniqueKey).Result()
// task := asynq.NewTask(msg.Type, msg.Payload) failIfError(err, "Failed to get ttl")
// opts := createOptions(msg, r.Client())
// opts = append(opts, asynq.ProcessAt(time.Unix(z.Score, 0))) if ttl > 0 {
// if _, err := c.Enqueue(task, opts...); err != nil { err = r.Client().Del(msg.UniqueKey).Err()
// fmt.Printf("(error): Failed to enqueue task: %v", err) logIfError(err, "Failed to delete unique key")
// os.Exit(1) }
// }
// } // Regenerate unique key.
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
if ttl > 0 {
err = r.ScheduleUnique(msg, processAt, ttl)
} else {
err = r.Schedule(msg, processAt)
}
failIfError(err, "Failed to enqueue pending message")
} else {
err := r.Schedule(msg, processAt)
failIfError(err, "Failed to enqueue scheduled message")
}
}
// Retry Tasks // Retry Tasks
@ -176,10 +192,8 @@ func migrate(cmd *cobra.Command, args []string) {
backupKey(base.ArchivedKey(qname)), backupKey(base.ArchivedKey(qname)),
} }
for _, key := range keys { for _, key := range keys {
if err := r.Client().Del(key).Err(); err != nil { err := r.Client().Del(key).Err()
fmt.Printf("(error): Failed to delete backup key: %v", err) failIfError(err, "Failed to delete backup key")
os.Exit(1)
}
} }
} }
fmt.Print("Done\n") fmt.Print("Done\n")