mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Add helper function for zset messages
This commit is contained in:
parent
8fe71acfde
commit
2d75ac4af4
@ -14,6 +14,7 @@ import (
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@ -142,42 +143,9 @@ func migrate(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// Scheduled Tasks
|
||||
zs, err := r.Client().ZRangeWithScores(backupKey(base.ScheduledKey(qname)), 0, -1).Result()
|
||||
failIfError(err, "Failed to read")
|
||||
|
||||
for _, z := range zs {
|
||||
msg, err := UnmarshalOldMessage(z.Member.(string))
|
||||
failIfError(err, "Failed to unmarshal message")
|
||||
|
||||
processAt := time.Unix(int64(z.Score), 0)
|
||||
|
||||
if msg.UniqueKey != "" {
|
||||
ttl, err := r.Client().TTL(msg.UniqueKey).Result()
|
||||
failIfError(err, "Failed to get ttl")
|
||||
|
||||
if ttl > 0 {
|
||||
err = r.Client().Del(msg.UniqueKey).Err()
|
||||
logIfError(err, "Failed to delete unique key")
|
||||
}
|
||||
|
||||
// Regenerate unique key.
|
||||
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
|
||||
if ttl > 0 {
|
||||
err = r.ScheduleUnique(msg, processAt, ttl)
|
||||
} else {
|
||||
err = r.Schedule(msg, processAt)
|
||||
}
|
||||
failIfError(err, "Failed to enqueue pending message")
|
||||
} else {
|
||||
err := r.Schedule(msg, processAt)
|
||||
failIfError(err, "Failed to enqueue scheduled message")
|
||||
}
|
||||
}
|
||||
|
||||
// Retry Tasks
|
||||
|
||||
// Archived Tasks
|
||||
updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled")
|
||||
updateZSetMessages(r.Client(), base.RetryKey(qname), "retry")
|
||||
updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived")
|
||||
|
||||
}
|
||||
fmt.Print("Done\n")
|
||||
@ -278,3 +246,141 @@ func DecodeMessage(s string) (*OldTaskMessage, error) {
|
||||
}
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[2] -> asynq:{<qname>}:scheduled
|
||||
// ARGV[1] -> task message data
|
||||
// ARGV[2] -> zset score
|
||||
// ARGV[3] -> task ID
|
||||
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
||||
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
||||
// ARGV[6] -> task state (e.g. "retry", "archived")
|
||||
var taskZAddCmd = redis.NewScript(`
|
||||
redis.call("HSET", KEYS[1],
|
||||
"msg", ARGV[1],
|
||||
"state", ARGV[6],
|
||||
"timeout", ARGV[4],
|
||||
"deadline", ARGV[5])
|
||||
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
|
||||
return 1
|
||||
`)
|
||||
|
||||
// ZAddTask adds task to zset.
|
||||
func ZAddTask(c redis.UniversalClient, key string, msg *base.TaskMessage, score float64, state string) error {
|
||||
encoded, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||
key,
|
||||
}
|
||||
argv := []interface{}{
|
||||
encoded,
|
||||
score,
|
||||
msg.ID.String(),
|
||||
msg.Timeout,
|
||||
msg.Deadline,
|
||||
state,
|
||||
}
|
||||
return taskZAddCmd.Run(c, keys, argv...).Err()
|
||||
}
|
||||
|
||||
// KEYS[1] -> unique key
|
||||
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[3] -> zset key (e.g. asynq:{<qname>}:scheduled)
|
||||
// --
|
||||
// ARGV[1] -> task ID
|
||||
// ARGV[2] -> uniqueness lock TTL
|
||||
// ARGV[3] -> score (process_at timestamp)
|
||||
// ARGV[4] -> task message
|
||||
// ARGV[5] -> task timeout in seconds (0 if not timeout)
|
||||
// ARGV[6] -> task deadline in unix time (0 if no deadline)
|
||||
// ARGV[7] -> task state (oneof "scheduled", "retry", "archived")
|
||||
var taskZAddUniqueCmd = redis.NewScript(`
|
||||
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
||||
if not ok then
|
||||
return 0
|
||||
end
|
||||
redis.call("HSET", KEYS[2],
|
||||
"msg", ARGV[4],
|
||||
"state", ARGV[7],
|
||||
"timeout", ARGV[5],
|
||||
"deadline", ARGV[6],
|
||||
"unique_key", KEYS[1])
|
||||
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
|
||||
return 1
|
||||
`)
|
||||
|
||||
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
|
||||
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
||||
func ZAddTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, score float64, state string, ttl time.Duration) error {
|
||||
encoded, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
keys := []string{
|
||||
msg.UniqueKey,
|
||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||
key,
|
||||
}
|
||||
argv := []interface{}{
|
||||
msg.ID.String(),
|
||||
int(ttl.Seconds()),
|
||||
score,
|
||||
encoded,
|
||||
msg.Timeout,
|
||||
msg.Deadline,
|
||||
state,
|
||||
}
|
||||
res, err := taskZAddUniqueCmd.Run(c, keys, argv...).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
if !ok {
|
||||
return errors.E(errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
||||
}
|
||||
if n == 0 {
|
||||
return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateZSetMessages(c redis.UniversalClient, key, state string) {
|
||||
zs, err := c.ZRangeWithScores(backupKey(key), 0, -1).Result()
|
||||
failIfError(err, "Failed to read")
|
||||
|
||||
for _, z := range zs {
|
||||
msg, err := UnmarshalOldMessage(z.Member.(string))
|
||||
failIfError(err, "Failed to unmarshal message")
|
||||
|
||||
if msg.UniqueKey != "" {
|
||||
ttl, err := c.TTL(msg.UniqueKey).Result()
|
||||
failIfError(err, "Failed to get ttl")
|
||||
|
||||
if ttl > 0 {
|
||||
err = c.Del(msg.UniqueKey).Err()
|
||||
logIfError(err, "Failed to delete unique key")
|
||||
}
|
||||
|
||||
// Regenerate unique key.
|
||||
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
|
||||
if ttl > 0 {
|
||||
err = ZAddTaskUnique(c, key, msg, z.Score, state, ttl)
|
||||
} else {
|
||||
err = ZAddTask(c, key, msg, z.Score, state)
|
||||
}
|
||||
failIfError(err, "Failed to zadd message")
|
||||
} else {
|
||||
err := ZAddTask(c, key, msg, z.Score, state)
|
||||
failIfError(err, "Failed to enqueue scheduled message")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user