diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ee23ed..34b0dae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`. - `Scheduler` API has changed. Renamed `Stop` to `Shutdown`. - Requires redis v4.0+ for multiple field/value pair support -- Renamed pending key (TODO: need migration script) - `Client.Enqueue` now returns `TaskInfo` - `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask` - `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask` diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 31e1fbf..7fcf3f1 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -994,7 +994,7 @@ else end end local unique_key = redis.call("HGET", KEYS[1], "unique_key") -if unique_key ~= nil and unique_key ~= "" and redis.call("GET", unique_key) == ARGV[1] then +if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == ARGV[1] then redis.call("DEL", unique_key) end return redis.call("DEL", KEYS[1]) @@ -1094,7 +1094,7 @@ local ids = redis.call("ZRANGE", KEYS[1], 0, -1) for _, id in ipairs(ids) do local task_key = ARGV[1] .. id local unique_key = redis.call("HGET", task_key, "unique_key") - if unique_key ~= nil and unique_key ~= "" and redis.call("GET", unique_key) == id then + if unique_key and unique_key ~= "" and redis.call("GET", unique_key) == id then redis.call("DEL", unique_key) end redis.call("DEL", task_key) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 36a8343..f65f3c9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -32,6 +32,11 @@ func (r *RDB) Close() error { return r.client.Close() } +// Client returns the reference to underlying redis client. +func (r *RDB) Client() redis.UniversalClient { + return r.client +} + // Ping checks the connection with redis server. func (r *RDB) Ping() error { return r.client.Ping().Err() diff --git a/tools/asynq/cmd/migrate.go b/tools/asynq/cmd/migrate.go new file mode 100644 index 0000000..8ef437d --- /dev/null +++ b/tools/asynq/cmd/migrate.go @@ -0,0 +1,404 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/uuid" + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" +) + +// migrateCmd represents the migrate command. +var migrateCmd = &cobra.Command{ + Use: "migrate", + Short: fmt.Sprintf("Migrate existing tasks and queues to be asynq%s compatible", base.Version), + Long: `Migrate (asynq migrate) will migrate existing tasks and queues in redis to be compatible with the latest version of asynq. +`, + Args: cobra.NoArgs, + Run: migrate, +} + +func init() { + rootCmd.AddCommand(migrateCmd) +} + +func backupKey(key string) string { + return fmt.Sprintf("%s:backup", key) +} + +func renameKeyAsBackup(c redis.UniversalClient, key string) error { + if c.Exists(key).Val() == 0 { + return nil // key doesn't exist; no-op + } + return c.Rename(key, backupKey(key)).Err() +} + +func failIfError(err error, msg string) { + if err != nil { + fmt.Printf("error: %s: %v\n", msg, err) + fmt.Println("*** Please report this issue at https://github.com/hibiken/asynq/issues ***") + os.Exit(1) + } +} + +func logIfError(err error, msg string) { + if err != nil { + fmt.Printf("warning: %s: %v\n", msg, err) + } +} + +func migrate(cmd *cobra.Command, args []string) { + r := createRDB() + queues, err := r.AllQueues() + failIfError(err, "Failed to get queue names") + + // --------------------------------------------- + // Pre-check: Ensure no active servers, tasks. + // --------------------------------------------- + srvs, err := r.ListServers() + failIfError(err, "Failed to get server infos") + if len(srvs) > 0 { + fmt.Println("(error): Server(s) still running. Please ensure that no asynq servers are running when runnning migrate command.") + os.Exit(1) + } + for _, qname := range queues { + stats, err := r.CurrentStats(qname) + failIfError(err, "Failed to get stats") + if stats.Active > 0 { + fmt.Printf("(error): %d active tasks found. Please ensure that no active tasks exist when running migrate command.\n", stats.Active) + os.Exit(1) + } + } + + // --------------------------------------------- + // Rename pending key + // --------------------------------------------- + fmt.Print("Renaming pending keys...") + for _, qname := range queues { + oldKey := fmt.Sprintf("asynq:{%s}", qname) + if r.Client().Exists(oldKey).Val() == 0 { + continue + } + newKey := base.PendingKey(qname) + err := r.Client().Rename(oldKey, newKey).Err() + failIfError(err, "Failed to rename key") + } + fmt.Print("Done\n") + + // --------------------------------------------- + // Rename keys as backup + // --------------------------------------------- + fmt.Print("Renaming keys for backup...") + for _, qname := range queues { + keys := []string{ + base.ActiveKey(qname), + base.PendingKey(qname), + base.ScheduledKey(qname), + base.RetryKey(qname), + base.ArchivedKey(qname), + } + for _, key := range keys { + err := renameKeyAsBackup(r.Client(), key) + failIfError(err, fmt.Sprintf("Failed to rename key %q for backup", key)) + } + } + fmt.Print("Done\n") + + // --------------------------------------------- + // Update to new schema + // --------------------------------------------- + fmt.Print("Updating to new schema...") + for _, qname := range queues { + updatePendingMessages(r, qname) + updateZSetMessages(r.Client(), base.ScheduledKey(qname), "scheduled") + updateZSetMessages(r.Client(), base.RetryKey(qname), "retry") + updateZSetMessages(r.Client(), base.ArchivedKey(qname), "archived") + } + fmt.Print("Done\n") + + // --------------------------------------------- + // Delete backup keys + // --------------------------------------------- + fmt.Print("Deleting backup keys...") + for _, qname := range queues { + keys := []string{ + backupKey(base.ActiveKey(qname)), + backupKey(base.PendingKey(qname)), + backupKey(base.ScheduledKey(qname)), + backupKey(base.RetryKey(qname)), + backupKey(base.ArchivedKey(qname)), + } + for _, key := range keys { + err := r.Client().Del(key).Err() + failIfError(err, "Failed to delete backup key") + } + } + fmt.Print("Done\n") +} + +func UnmarshalOldMessage(encoded string) (*base.TaskMessage, error) { + oldMsg, err := DecodeMessage(encoded) + if err != nil { + return nil, err + } + payload, err := json.Marshal(oldMsg.Payload) + if err != nil { + return nil, fmt.Errorf("could not marshal payload: %v", err) + } + return &base.TaskMessage{ + Type: oldMsg.Type, + Payload: payload, + ID: oldMsg.ID, + Queue: oldMsg.Queue, + Retry: oldMsg.Retry, + Retried: oldMsg.Retried, + ErrorMsg: oldMsg.ErrorMsg, + LastFailedAt: 0, + Timeout: oldMsg.Timeout, + Deadline: oldMsg.Deadline, + UniqueKey: oldMsg.UniqueKey, + }, nil +} + +// TaskMessage from v0.17 +type OldTaskMessage struct { + // Type indicates the kind of the task to be performed. + Type string + + // Payload holds data needed to process the task. + Payload map[string]interface{} + + // ID is a unique identifier for each task. + ID uuid.UUID + + // Queue is a name this message should be enqueued to. + Queue string + + // Retry is the max number of retry for this task. + Retry int + + // Retried is the number of times we've retried this task so far. + Retried int + + // ErrorMsg holds the error message from the last failure. + ErrorMsg string + + // Timeout specifies timeout in seconds. + // If task processing doesn't complete within the timeout, the task will be retried + // if retry count is remaining. Otherwise it will be moved to the archive. + // + // Use zero to indicate no timeout. + Timeout int64 + + // Deadline specifies the deadline for the task in Unix time, + // the number of seconds elapsed since January 1, 1970 UTC. + // If task processing doesn't complete before the deadline, the task will be retried + // if retry count is remaining. Otherwise it will be moved to the archive. + // + // Use zero to indicate no deadline. + Deadline int64 + + // UniqueKey holds the redis key used for uniqueness lock for this task. + // + // Empty string indicates that no uniqueness lock was used. + UniqueKey string +} + +// DecodeMessage unmarshals the given encoded string and returns a decoded task message. +// Code from v0.17. +func DecodeMessage(s string) (*OldTaskMessage, error) { + d := json.NewDecoder(strings.NewReader(s)) + d.UseNumber() + var msg OldTaskMessage + if err := d.Decode(&msg); err != nil { + return nil, err + } + return &msg, nil +} + +func updatePendingMessages(r *rdb.RDB, qname string) { + data, err := r.Client().LRange(backupKey(base.PendingKey(qname)), 0, -1).Result() + failIfError(err, "Failed to read backup pending key") + + for _, s := range data { + msg, err := UnmarshalOldMessage(s) + failIfError(err, "Failed to unmarshal message") + + if msg.UniqueKey != "" { + ttl, err := r.Client().TTL(msg.UniqueKey).Result() + failIfError(err, "Failed to get ttl") + + if ttl > 0 { + err = r.Client().Del(msg.UniqueKey).Err() + logIfError(err, "Failed to delete unique key") + } + + // Regenerate unique key. + msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) + if ttl > 0 { + err = r.EnqueueUnique(msg, ttl) + } else { + err = r.Enqueue(msg) + } + failIfError(err, "Failed to enqueue message") + + } else { + err := r.Enqueue(msg) + failIfError(err, "Failed to enqueue message") + } + } +} + +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:scheduled +// ARGV[1] -> task message data +// ARGV[2] -> zset score +// ARGV[3] -> task ID +// ARGV[4] -> task timeout in seconds (0 if not timeout) +// ARGV[5] -> task deadline in unix time (0 if no deadline) +// ARGV[6] -> task state (e.g. "retry", "archived") +var taskZAddCmd = redis.NewScript(` +redis.call("HSET", KEYS[1], + "msg", ARGV[1], + "state", ARGV[6], + "timeout", ARGV[4], + "deadline", ARGV[5]) +redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) +return 1 +`) + +// ZAddTask adds task to zset. +func ZAddTask(c redis.UniversalClient, key string, msg *base.TaskMessage, score float64, state string) error { + // Special case; LastFailedAt field is new so assign a value inferred from zscore. + if state == "archived" { + msg.LastFailedAt = int64(score) + } + + encoded, err := base.EncodeMessage(msg) + if err != nil { + return err + } + if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { + return err + } + keys := []string{ + base.TaskKey(msg.Queue, msg.ID.String()), + key, + } + argv := []interface{}{ + encoded, + score, + msg.ID.String(), + msg.Timeout, + msg.Deadline, + state, + } + return taskZAddCmd.Run(c, keys, argv...).Err() +} + +// KEYS[1] -> unique key +// KEYS[2] -> asynq:{}:t: +// KEYS[3] -> zset key (e.g. asynq:{}:scheduled) +// -- +// ARGV[1] -> task ID +// ARGV[2] -> uniqueness lock TTL +// ARGV[3] -> score (process_at timestamp) +// ARGV[4] -> task message +// ARGV[5] -> task timeout in seconds (0 if not timeout) +// ARGV[6] -> task deadline in unix time (0 if no deadline) +// ARGV[7] -> task state (oneof "scheduled", "retry", "archived") +var taskZAddUniqueCmd = redis.NewScript(` +local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) +if not ok then + return 0 +end +redis.call("HSET", KEYS[2], + "msg", ARGV[4], + "state", ARGV[7], + "timeout", ARGV[5], + "deadline", ARGV[6], + "unique_key", KEYS[1]) +redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) +return 1 +`) + +// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. +// It returns ErrDuplicateTask if the lock cannot be acquired. +func ZAddTaskUnique(c redis.UniversalClient, key string, msg *base.TaskMessage, score float64, state string, ttl time.Duration) error { + encoded, err := base.EncodeMessage(msg) + if err != nil { + return err + } + if err := c.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { + return err + } + keys := []string{ + msg.UniqueKey, + base.TaskKey(msg.Queue, msg.ID.String()), + key, + } + argv := []interface{}{ + msg.ID.String(), + int(ttl.Seconds()), + score, + encoded, + msg.Timeout, + msg.Deadline, + state, + } + res, err := taskZAddUniqueCmd.Run(c, keys, argv...).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return errors.E(errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) + } + if n == 0 { + return errors.E(errors.AlreadyExists, errors.ErrDuplicateTask) + } + return nil +} + +func updateZSetMessages(c redis.UniversalClient, key, state string) { + zs, err := c.ZRangeWithScores(backupKey(key), 0, -1).Result() + failIfError(err, "Failed to read") + + for _, z := range zs { + msg, err := UnmarshalOldMessage(z.Member.(string)) + failIfError(err, "Failed to unmarshal message") + + if msg.UniqueKey != "" { + ttl, err := c.TTL(msg.UniqueKey).Result() + failIfError(err, "Failed to get ttl") + + if ttl > 0 { + err = c.Del(msg.UniqueKey).Err() + logIfError(err, "Failed to delete unique key") + } + + // Regenerate unique key. + msg.UniqueKey = base.UniqueKey(msg.Queue, msg.Type, msg.Payload) + if ttl > 0 { + err = ZAddTaskUnique(c, key, msg, z.Score, state, ttl) + } else { + err = ZAddTask(c, key, msg, z.Score, state) + } + failIfError(err, "Failed to zadd message") + } else { + err := ZAddTask(c, key, msg, z.Score, state) + failIfError(err, "Failed to enqueue scheduled message") + } + } +} diff --git a/tools/asynq/cmd/root.go b/tools/asynq/cmd/root.go index 109283f..35fa350 100644 --- a/tools/asynq/cmd/root.go +++ b/tools/asynq/cmd/root.go @@ -138,23 +138,24 @@ func createRDB() *rdb.RDB { // createRDB creates a Inspector instance using flag values and returns it. func createInspector() *asynq.Inspector { - var connOpt asynq.RedisConnOpt + return asynq.NewInspector(getRedisConnOpt()) +} + +func getRedisConnOpt() asynq.RedisConnOpt { if useRedisCluster { addrs := strings.Split(viper.GetString("cluster_addrs"), ",") - connOpt = asynq.RedisClusterClientOpt{ + return asynq.RedisClusterClientOpt{ Addrs: addrs, Password: viper.GetString("password"), TLSConfig: getTLSConfig(), } - } else { - connOpt = asynq.RedisClientOpt{ - Addr: viper.GetString("uri"), - DB: viper.GetInt("db"), - Password: viper.GetString("password"), - TLSConfig: getTLSConfig(), - } } - return asynq.NewInspector(connOpt) + return asynq.RedisClientOpt{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + TLSConfig: getTLSConfig(), + } } func getTLSConfig() *tls.Config { diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index d569ddb..de83b4c 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -22,7 +22,7 @@ import ( var statsCmd = &cobra.Command{ Use: "stats", Short: "Shows current state of the tasks and queues", - Long: `Stats (aysnqmon stats) will show the overview of tasks and queues at that instant. + Long: `Stats (aysnq stats) will show the overview of tasks and queues at that instant. Specifically, the command shows the following: * Number of tasks in each state