diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go index 28c3819..56ed70d 100644 --- a/tools/asynq/cmd/migrate.go +++ b/tools/asynq/cmd/migrate.go @@ -15,6 +15,7 @@ import ( "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" + "github.com/hibiken/asynq/internal/rdb" "github.com/spf13/cobra" ) @@ -69,11 +70,30 @@ func logIfError(err error, msg string) { func migrate(cmd *cobra.Command, args []string) { r := createRDB() - - // Rename key asynq:{} -> asynq:{}:pending queues, err := r.AllQueues() failIfError(err, "Failed to get queue names") + // --------------------------------------------- + // Pre-check: Ensure no active servers, tasks. + // --------------------------------------------- + srvs, err := r.ListServers() + failIfError(err, "Failed to get server infos") + if len(srvs) > 0 { + fmt.Println("(error): Server(s) still running. Please ensure that no asynq servers are running when runnning migrate command.") + os.Exit(1) + } + for _, qname := range queues { + stats, err := r.CurrentStats(qname) + failIfError(err, "Failed to get stats") + if stats.Active > 0 { + fmt.Printf("(error): %d active tasks found. Please ensure that no active tasks exist when running migrate command.\n", stats.Active) + os.Exit(1) + } + } + + // --------------------------------------------- + // Rename pending key + // --------------------------------------------- fmt.Print("Renaming pending keys...") for _, qname := range queues { oldKey := fmt.Sprintf("asynq:{%s}", qname) @@ -86,7 +106,9 @@ func migrate(cmd *cobra.Command, args []string) { } fmt.Print("Done\n") - // Rename LIST/ZSET keys as backup + // --------------------------------------------- + // Rename keys as backup + // --------------------------------------------- fmt.Print("Renaming keys for backup...") for _, qname := range queues { keys := []string{ @@ -103,17 +125,21 @@ func migrate(cmd *cobra.Command, args []string) { } fmt.Print("Done\n") + // --------------------------------------------- + // Update to new schema + // --------------------------------------------- fmt.Print("Updating to new schema...") for _, qname := range queues { - // TODO: Deadlines set - updateListMessages(r.Client(), base.ActiveKey(qname), "active") - updateListMessages(r.Client(), base.PendingKey(qname), "pending") + updatePendingMessages(r, qname) updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled") updateZSetMessages(r.Client(), base.RetryKey(qname), "retry") updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived") } fmt.Print("Done\n") + // --------------------------------------------- + // Delete backup keys + // --------------------------------------------- fmt.Print("Deleting backup keys...") for _, qname := range queues { keys := []string{ @@ -211,117 +237,8 @@ func DecodeMessage(s string) (*OldTaskMessage, error) { return &msg, nil } -// Input: -// KEYS[1] -> asynq:{}:t: -// KEYS[2] -> asynq:{}:pending -// -- -// ARGV[1] -> task message data -// ARGV[2] -> task ID -// ARGV[3] -> task timeout in seconds (0 if not timeout) -// ARGV[4] -> task deadline in unix time (0 if no deadline) -// ARGV[5] -> task state (oneof "active", "pending") -// -// Output: -// Returns 1 if successfully enqueued -var taskLPushCmd = redis.NewScript(` -redis.call("HSET", KEYS[1], - "msg", ARGV[1], - "state", ARGV[5], - "timeout", ARGV[3], - "deadline", ARGV[4]) -redis.call("LPUSH", KEYS[2], ARGV[2]) -return 1 -`) - -// Enqueue adds the given task to the pending list of the queue. -func LPushTask(c redis.UniversalClient, key string, msg *base.TaskMessage, state string) error { - encoded, err := base.EncodeMessage(msg) - if err != nil { - return err - } - if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err - } - keys := []string{ - base.TaskKey(msg.Queue, msg.ID.String()), - key, - } - argv := []interface{}{ - encoded, - msg.ID.String(), - msg.Timeout, - msg.Deadline, - state, - } - return taskLPushCmd.Run(c, keys, argv...).Err() -} - -// KEYS[1] -> unique key -// KEYS[2] -> asynq:{}:t: -// KEYS[3] -> asynq:{}:pending -// -- -// ARGV[1] -> task ID -// ARGV[2] -> uniqueness lock TTL -// ARGV[3] -> task message data -// ARGV[4] -> task timeout in seconds (0 if not timeout) -// ARGV[5] -> task deadline in unix time (0 if no deadline) -// ARGV[6] -> task state (oneof "active", "pending") -// -// Output: -// Returns 1 if successfully enqueued -// Returns 0 if task already exists -var taskLPushUniqueCmd = redis.NewScript(` -local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) -if not ok then - return 0 -end -redis.call("HSET", KEYS[2], - "msg", ARGV[3], - "state", ARGV[6], - "timeout", ARGV[4], - "deadline", ARGV[5], - "unique_key", KEYS[1]) -redis.call("LPUSH", KEYS[3], ARGV[1]) -return 1 -`) - -func LPushTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, state string, ttl time.Duration) error { - encoded, err := base.EncodeMessage(msg) - if err != nil { - return err - } - if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err - } - keys := []string{ - msg.UniqueKey, - base.TaskKey(msg.Queue, msg.ID.String()), - key, - } - argv := []interface{}{ - msg.ID.String(), - int(ttl.Seconds()), - encoded, - msg.Timeout, - msg.Deadline, - state, - } - res, err := taskLPushUniqueCmd.Run(c, keys, argv...).Result() - if err != nil { - return err - } - n, ok := res.(int64) - if !ok { - return errors.E(errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res)) - } - if n == 0 { - return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask) - } - return nil -} - -func updateListMessages(c redis.UniversalClient, key, state string) { - data, err := c.LRange(backupKey(key), 0, -1).Result() +func updatePendingMessages(r *rdb.RDB, qname string) { + data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result() failIfError(err, "Failed to read backup pending key") for _, s := range data { @@ -329,26 +246,26 @@ func updateListMessages(c redis.UniversalClient, key, state string) { failIfError(err, "Failed to unmarshal message") if msg.UniqueKey != "" { - ttl, err := c.TTL(msg.UniqueKey).Result() + ttl, err := r.Client().TTL(msg.UniqueKey).Result() failIfError(err, "Failed to get ttl") if ttl > 0 { - err = c.Del(msg.UniqueKey).Err() + err = r.Client().Del(msg.UniqueKey).Err() logIfError(err, "Failed to delete unique key") } // Regenerate unique key. msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) if ttl > 0 { - err = LPushTaskUnique(c, key, msg, state, ttl) + err = r.EnqueueUnique(msg, ttl) } else { - err = LPushTask(c, key, msg, state) + err = r.Enqueue(msg) } - failIfError(err, "Failed to lpush message") + failIfError(err, "Failed to enqueue message") } else { - err := LPushTask(c, key, msg, state) - failIfError(err, "Failed to lpush message") + err := r.Enqueue(msg) + failIfError(err, "Failed to enqueue message") } } }