mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Add precheck
This commit is contained in:
parent
1ddc08a3c6
commit
06f8164174
@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -69,11 +70,30 @@ func logIfError(err error, msg string) {
|
|||||||
|
|
||||||
func migrate(cmd *cobra.Command, args []string) {
|
func migrate(cmd *cobra.Command, args []string) {
|
||||||
r := createRDB()
|
r := createRDB()
|
||||||
|
|
||||||
// Rename key asynq:{<qname>} -> asynq:{<qname>}:pending
|
|
||||||
queues, err := r.AllQueues()
|
queues, err := r.AllQueues()
|
||||||
failIfError(err, "Failed to get queue names")
|
failIfError(err, "Failed to get queue names")
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
// Pre-check: Ensure no active servers, tasks.
|
||||||
|
// ---------------------------------------------
|
||||||
|
srvs, err := r.ListServers()
|
||||||
|
failIfError(err, "Failed to get server infos")
|
||||||
|
if len(srvs) > 0 {
|
||||||
|
fmt.Println("(error): Server(s) still running. Please ensure that no asynq servers are running when runnning migrate command.")
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
for _, qname := range queues {
|
||||||
|
stats, err := r.CurrentStats(qname)
|
||||||
|
failIfError(err, "Failed to get stats")
|
||||||
|
if stats.Active > 0 {
|
||||||
|
fmt.Printf("(error): %d active tasks found. Please ensure that no active tasks exist when running migrate command.\n", stats.Active)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
// Rename pending key
|
||||||
|
// ---------------------------------------------
|
||||||
fmt.Print("Renaming pending keys...")
|
fmt.Print("Renaming pending keys...")
|
||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
oldKey := fmt.Sprintf("asynq:{%s}", qname)
|
oldKey := fmt.Sprintf("asynq:{%s}", qname)
|
||||||
@ -86,7 +106,9 @@ func migrate(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
fmt.Print("Done\n")
|
fmt.Print("Done\n")
|
||||||
|
|
||||||
// Rename LIST/ZSET keys as backup
|
// ---------------------------------------------
|
||||||
|
// Rename keys as backup
|
||||||
|
// ---------------------------------------------
|
||||||
fmt.Print("Renaming keys for backup...")
|
fmt.Print("Renaming keys for backup...")
|
||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
keys := []string{
|
keys := []string{
|
||||||
@ -103,17 +125,21 @@ func migrate(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
fmt.Print("Done\n")
|
fmt.Print("Done\n")
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
// Update to new schema
|
||||||
|
// ---------------------------------------------
|
||||||
fmt.Print("Updating to new schema...")
|
fmt.Print("Updating to new schema...")
|
||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
// TODO: Deadlines set
|
updatePendingMessages(r, qname)
|
||||||
updateListMessages(r.Client(), base.ActiveKey(qname), "active")
|
|
||||||
updateListMessages(r.Client(), base.PendingKey(qname), "pending")
|
|
||||||
updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled")
|
updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled")
|
||||||
updateZSetMessages(r.Client(), base.RetryKey(qname), "retry")
|
updateZSetMessages(r.Client(), base.RetryKey(qname), "retry")
|
||||||
updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived")
|
updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived")
|
||||||
}
|
}
|
||||||
fmt.Print("Done\n")
|
fmt.Print("Done\n")
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
// Delete backup keys
|
||||||
|
// ---------------------------------------------
|
||||||
fmt.Print("Deleting backup keys...")
|
fmt.Print("Deleting backup keys...")
|
||||||
for _, qname := range queues {
|
for _, qname := range queues {
|
||||||
keys := []string{
|
keys := []string{
|
||||||
@ -211,117 +237,8 @@ func DecodeMessage(s string) (*OldTaskMessage, error) {
|
|||||||
return &msg, nil
|
return &msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Input:
|
func updatePendingMessages(r *rdb.RDB, qname string) {
|
||||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result()
|
||||||
// KEYS[2] -> asynq:{<qname>}:pending
|
|
||||||
// --
|
|
||||||
// ARGV[1] -> task message data
|
|
||||||
// ARGV[2] -> task ID
|
|
||||||
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
|
||||||
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
|
||||||
// ARGV[5] -> task state (oneof "active", "pending")
|
|
||||||
//
|
|
||||||
// Output:
|
|
||||||
// Returns 1 if successfully enqueued
|
|
||||||
var taskLPushCmd = redis.NewScript(`
|
|
||||||
redis.call("HSET", KEYS[1],
|
|
||||||
"msg", ARGV[1],
|
|
||||||
"state", ARGV[5],
|
|
||||||
"timeout", ARGV[3],
|
|
||||||
"deadline", ARGV[4])
|
|
||||||
redis.call("LPUSH", KEYS[2], ARGV[2])
|
|
||||||
return 1
|
|
||||||
`)
|
|
||||||
|
|
||||||
// Enqueue adds the given task to the pending list of the queue.
|
|
||||||
func LPushTask(c redis.UniversalClient, key string, msg *base.TaskMessage, state string) error {
|
|
||||||
encoded, err := base.EncodeMessage(msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
keys := []string{
|
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
|
||||||
key,
|
|
||||||
}
|
|
||||||
argv := []interface{}{
|
|
||||||
encoded,
|
|
||||||
msg.ID.String(),
|
|
||||||
msg.Timeout,
|
|
||||||
msg.Deadline,
|
|
||||||
state,
|
|
||||||
}
|
|
||||||
return taskLPushCmd.Run(c, keys, argv...).Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
// KEYS[1] -> unique key
|
|
||||||
// KEYS[2] -> asynq:{<qname>}:t:<taskid>
|
|
||||||
// KEYS[3] -> asynq:{<qname>}:pending
|
|
||||||
// --
|
|
||||||
// ARGV[1] -> task ID
|
|
||||||
// ARGV[2] -> uniqueness lock TTL
|
|
||||||
// ARGV[3] -> task message data
|
|
||||||
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
|
||||||
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
|
||||||
// ARGV[6] -> task state (oneof "active", "pending")
|
|
||||||
//
|
|
||||||
// Output:
|
|
||||||
// Returns 1 if successfully enqueued
|
|
||||||
// Returns 0 if task already exists
|
|
||||||
var taskLPushUniqueCmd = redis.NewScript(`
|
|
||||||
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
|
||||||
if not ok then
|
|
||||||
return 0
|
|
||||||
end
|
|
||||||
redis.call("HSET", KEYS[2],
|
|
||||||
"msg", ARGV[3],
|
|
||||||
"state", ARGV[6],
|
|
||||||
"timeout", ARGV[4],
|
|
||||||
"deadline", ARGV[5],
|
|
||||||
"unique_key", KEYS[1])
|
|
||||||
redis.call("LPUSH", KEYS[3], ARGV[1])
|
|
||||||
return 1
|
|
||||||
`)
|
|
||||||
|
|
||||||
func LPushTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, state string, ttl time.Duration) error {
|
|
||||||
encoded, err := base.EncodeMessage(msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
keys := []string{
|
|
||||||
msg.UniqueKey,
|
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
|
||||||
key,
|
|
||||||
}
|
|
||||||
argv := []interface{}{
|
|
||||||
msg.ID.String(),
|
|
||||||
int(ttl.Seconds()),
|
|
||||||
encoded,
|
|
||||||
msg.Timeout,
|
|
||||||
msg.Deadline,
|
|
||||||
state,
|
|
||||||
}
|
|
||||||
res, err := taskLPushUniqueCmd.Run(c, keys, argv...).Result()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
n, ok := res.(int64)
|
|
||||||
if !ok {
|
|
||||||
return errors.E(errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res))
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateListMessages(c redis.UniversalClient, key, state string) {
|
|
||||||
data, err := c.LRange(backupKey(key), 0, -1).Result()
|
|
||||||
failIfError(err, "Failed to read backup pending key")
|
failIfError(err, "Failed to read backup pending key")
|
||||||
|
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
@ -329,26 +246,26 @@ func updateListMessages(c redis.UniversalClient, key, state string) {
|
|||||||
failIfError(err, "Failed to unmarshal message")
|
failIfError(err, "Failed to unmarshal message")
|
||||||
|
|
||||||
if msg.UniqueKey != "" {
|
if msg.UniqueKey != "" {
|
||||||
ttl, err := c.TTL(msg.UniqueKey).Result()
|
ttl, err := r.Client().TTL(msg.UniqueKey).Result()
|
||||||
failIfError(err, "Failed to get ttl")
|
failIfError(err, "Failed to get ttl")
|
||||||
|
|
||||||
if ttl > 0 {
|
if ttl > 0 {
|
||||||
err = c.Del(msg.UniqueKey).Err()
|
err = r.Client().Del(msg.UniqueKey).Err()
|
||||||
logIfError(err, "Failed to delete unique key")
|
logIfError(err, "Failed to delete unique key")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Regenerate unique key.
|
// Regenerate unique key.
|
||||||
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
|
msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload)
|
||||||
if ttl > 0 {
|
if ttl > 0 {
|
||||||
err = LPushTaskUnique(c, key, msg, state, ttl)
|
err = r.EnqueueUnique(msg, ttl)
|
||||||
} else {
|
} else {
|
||||||
err = LPushTask(c, key, msg, state)
|
err = r.Enqueue(msg)
|
||||||
}
|
}
|
||||||
failIfError(err, "Failed to lpush message")
|
failIfError(err, "Failed to enqueue message")
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
err := LPushTask(c, key, msg, state)
|
err := r.Enqueue(msg)
|
||||||
failIfError(err, "Failed to lpush message")
|
failIfError(err, "Failed to enqueue message")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user