2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

Add helper to update redis list messages

This commit is contained in:
Ken Hibino 2021-06-17 07:02:40 -07:00
parent 2d75ac4af4
commit 1ddc08a3c6

View File

@ -104,49 +104,13 @@ func migrate(cmd *cobra.Command, args []string) {
fmt.Print("Done\n")
fmt.Print("Updating to new schema...")
// All list/zset should contain task-ids and store task data under task key
// - Unmarshal using old task schema
// - Marshal data using new schema
for _, qname := range queues {
// Active Tasks + Deadlines set
// Pending Tasks
data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result()
failIfError(err, "Failed to read backup pending key")
for _, s := range data {
msg, err := UnmarshalOldMessage(s)
failIfError(err, "Failed to unmarshal message")
if msg.UniqueKey != "" {
ttl, err := r.Client().TTL(msg.UniqueKey).Result()
failIfError(err, "Failed to get ttl")
if ttl > 0 {
err = r.Client().Del(msg.UniqueKey).Err()
logIfError(err, "Failed to delete unique key")
}
// Regenerate unique key.
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
if ttl > 0 {
err = r.EnqueueUnique(msg, ttl)
} else {
err = r.Enqueue(msg)
}
failIfError(err, "Failed to enqueue pending message")
} else {
err := r.Enqueue(msg)
failIfError(err, "Failed to enqueue pending message")
}
}
// TODO: Deadlines set
updateListMessages(r.Client(), base.ActiveKey(qname), "active")
updateListMessages(r.Client(), base.PendingKey(qname), "pending")
updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled")
updateZSetMessages(r.Client(), base.RetryKey(qname), "retry")
updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived")
}
fmt.Print("Done\n")
@ -247,6 +211,148 @@ func DecodeMessage(s string) (*OldTaskMessage, error) {
return &msg, nil
}
// Input:
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline)
// ARGV[5] -> task state (oneof "active", "pending")
//
// Output:
// Returns 1 if successfully enqueued
var taskLPushCmd = redis.NewScript(`
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", ARGV[5],
"timeout", ARGV[3],
"deadline", ARGV[4])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
// Enqueue adds the given task to the pending list of the queue.
func LPushTask(c redis.UniversalClient, key string, msg *base.TaskMessage, state string) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()),
key,
}
argv := []interface{}{
encoded,
msg.ID.String(),
msg.Timeout,
msg.Deadline,
state,
}
return taskLPushCmd.Run(c, keys, argv...).Err()
}
// KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}:t:<taskid>
// KEYS[3] -> asynq:{<qname>}:pending
// --
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
// ARGV[6] -> task state (oneof "active", "pending")
//
// Output:
// Returns 1 if successfully enqueued
// Returns 0 if task already exists
var taskLPushUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("HSET", KEYS[2],
"msg", ARGV[3],
"state", ARGV[6],
"timeout", ARGV[4],
"deadline", ARGV[5],
"unique_key", KEYS[1])
redis.call("LPUSH", KEYS[3], ARGV[1])
return 1
`)
func LPushTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, state string, ttl time.Duration) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
keys := []string{
msg.UniqueKey,
base.TaskKey(msg.Queue, msg.ID.String()),
key,
}
argv := []interface{}{
msg.ID.String(),
int(ttl.Seconds()),
encoded,
msg.Timeout,
msg.Deadline,
state,
}
res, err := taskLPushUniqueCmd.Run(c, keys, argv...).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return errors.E(errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res))
}
if n == 0 {
return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask)
}
return nil
}
func updateListMessages(c redis.UniversalClient, key, state string) {
data, err := c.LRange(backupKey(key), 0, -1).Result()
failIfError(err, "Failed to read backup pending key")
for _, s := range data {
msg, err := UnmarshalOldMessage(s)
failIfError(err, "Failed to unmarshal message")
if msg.UniqueKey != "" {
ttl, err := c.TTL(msg.UniqueKey).Result()
failIfError(err, "Failed to get ttl")
if ttl > 0 {
err = c.Del(msg.UniqueKey).Err()
logIfError(err, "Failed to delete unique key")
}
// Regenerate unique key.
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
if ttl > 0 {
err = LPushTaskUnique(c, key, msg, state, ttl)
} else {
err = LPushTask(c, key, msg, state)
}
failIfError(err, "Failed to lpush message")
} else {
err := LPushTask(c, key, msg, state)
failIfError(err, "Failed to lpush message")
}
}
}
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:scheduled
// ARGV[1] -> task message data