2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-15 20:08:46 +08:00

Add migrate command to CLI

The command converts all messages in redis to be compatible for asynq
v0.10.0
This commit is contained in:
Ken Hibino 2020-06-26 06:11:39 -07:00
parent 7eebbf181e
commit 83c9d5ae94
2 changed files with 215 additions and 0 deletions

214
tools/asynq/cmd/migrate.go Normal file
View File

@ -0,0 +1,214 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"encoding/json"
"fmt"
"os"
"strings"
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/rs/xid"
"github.com/spf13/cast"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
const version = "v0.10.0" // TODO: Define this version string in a more canonical place.
// migrateCmd represents the migrate command
var migrateCmd = &cobra.Command{
Use: "migrate",
Short: fmt.Sprintf("Migrate all tasks to be compatible with asynq@%s", version),
Long: fmt.Sprintf("Migrate (asynq migrate) will convert all tasks in redis to be compatible with asynq@%s.", version),
Run: migrate,
}
func init() {
rootCmd.AddCommand(migrateCmd)
}
func migrate(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
lists := []string{base.InProgressQueue}
allQueues, err := c.SMembers(base.AllQueues).Result()
if err != nil {
fmt.Printf("error: could not read all queues: %v", err)
os.Exit(1)
}
lists = append(lists, allQueues...)
for _, key := range lists {
if err := migrateList(c, key); err != nil {
fmt.Printf("error: %v", err)
os.Exit(1)
}
}
zsets := []string{base.ScheduledQueue, base.RetryQueue, base.DeadQueue}
for _, key := range zsets {
if err := migrateZSet(c, key); err != nil {
fmt.Printf("error: %v", err)
os.Exit(1)
}
}
}
type oldTaskMessage struct {
// Unchanged
Type string
Payload map[string]interface{}
ID xid.ID
Queue string
Retry int
Retried int
ErrorMsg string
UniqueKey string
// Following fields have changed.
// Deadline specifies the deadline for the task.
// Task won't be processed if it exceeded its deadline.
// The string shoulbe be in RFC3339 format.
//
// time.Time's zero value means no deadline.
Timeout string
// Deadline specifies the deadline for the task.
// Task won't be processed if it exceeded its deadline.
// The string shoulbe be in RFC3339 format.
//
// time.Time's zero value means no deadline.
Deadline string
}
var defaultTimeout = 30 * time.Minute
func convertMessage(old *oldTaskMessage) (*base.TaskMessage, error) {
timeout, err := time.ParseDuration(old.Timeout)
if err != nil {
return nil, fmt.Errorf("could not parse Timeout field of %+v", old)
}
deadline, err := time.Parse(time.RFC3339, old.Deadline)
if err != nil {
return nil, fmt.Errorf("could not parse Deadline field of %+v", old)
}
if timeout == 0 && deadline.IsZero() {
timeout = defaultTimeout
}
if deadline.IsZero() {
// Zero value used to be time.Time{},
// in the new schema zero value is represented by
// zero in Unix time.
deadline = time.Unix(0, 0)
}
return &base.TaskMessage{
Type: old.Type,
Payload: old.Payload,
ID: old.ID,
Queue: old.Queue,
Retry: old.Retry,
Retried: old.Retried,
ErrorMsg: old.ErrorMsg,
UniqueKey: old.UniqueKey,
Timeout: int64(timeout.Seconds()),
Deadline: deadline.Unix(),
}, nil
}
func deserialize(s string) (*base.TaskMessage, error) {
// Try deserializing as old message.
d := json.NewDecoder(strings.NewReader(s))
d.UseNumber()
var old *oldTaskMessage
if err := d.Decode(&old); err != nil {
// Try deserializing as new message.
d = json.NewDecoder(strings.NewReader(s))
d.UseNumber()
var msg *base.TaskMessage
if err := d.Decode(&msg); err != nil {
return nil, fmt.Errorf("could not deserialize %s into task message: %v", s, err)
}
return msg, nil
}
return convertMessage(old)
}
func migrateZSet(c *redis.Client, key string) error {
if c.Exists(key).Val() == 0 {
// skip if key doesn't exist.
return nil
}
res, err := c.ZRangeWithScores(key, 0, -1).Result()
if err != nil {
return err
}
var msgs []*redis.Z
for _, z := range res {
s, err := cast.ToStringE(z.Member)
if err != nil {
return fmt.Errorf("could not cast to string: %v", err)
}
msg, err := deserialize(s)
if err != nil {
return err
}
encoded, err := base.EncodeMessage(msg)
if err != nil {
return fmt.Errorf("could not encode message from %q: %v", key, err)
}
msgs = append(msgs, &redis.Z{Score: z.Score, Member: encoded})
}
if err := c.Rename(key, key+":backup").Err(); err != nil {
return fmt.Errorf("could not rename key %q: %v", key, err)
}
if err := c.ZAdd(key, msgs...).Err(); err != nil {
return fmt.Errorf("could not write new messages to %q: %v", key, err)
}
if err := c.Del(key + ":backup").Err(); err != nil {
return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err)
}
return nil
}
func migrateList(c *redis.Client, key string) error {
if c.Exists(key).Val() == 0 {
// skip if key doesn't exist.
return nil
}
res, err := c.LRange(key, 0, -1).Result()
if err != nil {
return err
}
var msgs []interface{}
for _, s := range res {
msg, err := deserialize(s)
if err != nil {
return err
}
encoded, err := base.EncodeMessage(msg)
if err != nil {
return fmt.Errorf("could not encode message from %q: %v", key, err)
}
msgs = append(msgs, encoded)
}
if err := c.Rename(key, key+":backup").Err(); err != nil {
return fmt.Errorf("could not rename key %q: %v", key, err)
}
if err := c.LPush(key, msgs...).Err(); err != nil {
return fmt.Errorf("could not write new messages to %q: %v", key, err)
}
if err := c.Del(key + ":backup").Err(); err != nil {
return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err)
}
return nil
}

View File

@ -7,6 +7,7 @@ require (
github.com/hibiken/asynq v0.4.0 github.com/hibiken/asynq v0.4.0
github.com/mitchellh/go-homedir v1.1.0 github.com/mitchellh/go-homedir v1.1.0
github.com/rs/xid v1.2.1 github.com/rs/xid v1.2.1
github.com/spf13/cast v1.3.1
github.com/spf13/cobra v0.0.5 github.com/spf13/cobra v0.0.5
github.com/spf13/viper v1.6.2 github.com/spf13/viper v1.6.2
) )