diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go index f9e048b..28c3819 100644 --- a/tools/asynq/cmd/migrate.go +++ b/tools/asynq/cmd/migrate.go @@ -104,49 +104,13 @@ func migrate(cmd *cobra.Command, args []string) { fmt.Print("Done\n") fmt.Print("Updating to new schema...") - // All list/zset should contain task-ids and store task data under task key - // - Unmarshal using old task schema - // - Marshal data using new schema - for _, qname := range queues { - // Active Tasks + Deadlines set - - // Pending Tasks - data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result() - failIfError(err, "Failed to read backup pending key") - - for _, s := range data { - msg, err := UnmarshalOldMessage(s) - failIfError(err, "Failed to unmarshal message") - - if msg.UniqueKey != "" { - ttl, err := r.Client().TTL(msg.UniqueKey).Result() - failIfError(err, "Failed to get ttl") - - if ttl > 0 { - err = r.Client().Del(msg.UniqueKey).Err() - logIfError(err, "Failed to delete unique key") - } - - // Regenerate unique key. - msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) - if ttl > 0 { - err = r.EnqueueUnique(msg, ttl) - } else { - err = r.Enqueue(msg) - } - failIfError(err, "Failed to enqueue pending message") - - } else { - err := r.Enqueue(msg) - failIfError(err, "Failed to enqueue pending message") - } - } - + // TODO: Deadlines set + updateListMessages(r.Client(), base.ActiveKey(qname), "active") + updateListMessages(r.Client(), base.PendingKey(qname), "pending") updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled") updateZSetMessages(r.Client(), base.RetryKey(qname), "retry") updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived") - } fmt.Print("Done\n") @@ -247,6 +211,148 @@ func DecodeMessage(s string) (*OldTaskMessage, error) { return &msg, nil } +// Input: +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:pending +// -- +// ARGV[1] -> task message data +// ARGV[2] -> task ID +// ARGV[3] -> task timeout in seconds (0 if not timeout) +// ARGV[4] -> task deadline in unix time (0 if no deadline) +// ARGV[5] -> task state (oneof "active", "pending") +// +// Output: +// Returns 1 if successfully enqueued +var taskLPushCmd = redis.NewScript(` +redis.call("HSET", KEYS[1], + "msg", ARGV[1], + "state", ARGV[5], + "timeout", ARGV[3], + "deadline", ARGV[4]) +redis.call("LPUSH", KEYS[2], ARGV[2]) +return 1 +`) + +// Enqueue adds the given task to the pending list of the queue. +func LPushTask(c redis.UniversalClient, key string, msg *base.TaskMessage, state string) error { + encoded, err := base.EncodeMessage(msg) + if err != nil { + return err + } + if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { + return err + } + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + key, + } + argv := []interface{}{ + encoded, + msg.ID.String(), + msg.Timeout, + msg.Deadline, + state, + } + return taskLPushCmd.Run(c, keys, argv...).Err() +} + +// KEYS[1] -> unique key +// KEYS[2] -> asynq:{}:t: +// KEYS[3] -> asynq:{}:pending +// -- +// ARGV[1] -> task ID +// ARGV[2] -> uniqueness lock TTL +// ARGV[3] -> task message data +// ARGV[4] -> task timeout in seconds (0 if not timeout) +// ARGV[5] -> task deadline in unix time (0 if no deadline) +// ARGV[6] -> task state (oneof "active", "pending") +// +// Output: +// Returns 1 if successfully enqueued +// Returns 0 if task already exists +var taskLPushUniqueCmd = redis.NewScript(` +local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) +if not ok then + return 0 +end +redis.call("HSET", KEYS[2], + "msg", ARGV[3], + "state", ARGV[6], + "timeout", ARGV[4], + "deadline", ARGV[5], + "unique_key", KEYS[1]) +redis.call("LPUSH", KEYS[3], ARGV[1]) +return 1 +`) + +func LPushTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, state string, ttl time.Duration) error { + encoded, err := base.EncodeMessage(msg) + if err != nil { + return err + } + if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { + return err + } + keys := []string{ + msg.UniqueKey, + base.TaskKey(msg.Queue, msg.ID.String()), + key, + } + argv := []interface{}{ + msg.ID.String(), + int(ttl.Seconds()), + encoded, + msg.Timeout, + msg.Deadline, + state, + } + res, err := taskLPushUniqueCmd.Run(c, keys, argv...).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return errors.E(errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res)) + } + if n == 0 { + return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask) + } + return nil +} + +func updateListMessages(c redis.UniversalClient, key, state string) { + data, err := c.LRange(backupKey(key), 0, -1).Result() + failIfError(err, "Failed to read backup pending key") + + for _, s := range data { + msg, err := UnmarshalOldMessage(s) + failIfError(err, "Failed to unmarshal message") + + if msg.UniqueKey != "" { + ttl, err := c.TTL(msg.UniqueKey).Result() + failIfError(err, "Failed to get ttl") + + if ttl > 0 { + err = c.Del(msg.UniqueKey).Err() + logIfError(err, "Failed to delete unique key") + } + + // Regenerate unique key. + msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) + if ttl > 0 { + err = LPushTaskUnique(c, key, msg, state, ttl) + } else { + err = LPushTask(c, key, msg, state) + } + failIfError(err, "Failed to lpush message") + + } else { + err := LPushTask(c, key, msg, state) + failIfError(err, "Failed to lpush message") + } + } +} + // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:scheduled // ARGV[1] -> task message data