mirror of
https://github.com/hibiken/asynq.git
synced 2025-08-19 15:08:55 +08:00
Fix asynq CLI build
This commit is contained in:
@@ -11,7 +11,7 @@ import (
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/inspeq"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -108,7 +108,7 @@ func cronHistory(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("Entry: %s\n\n", entryID)
|
||||
|
||||
events, err := inspector.ListSchedulerEnqueueEvents(
|
||||
entryID, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
entryID, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
continue
|
||||
|
@@ -1,389 +0,0 @@
|
||||
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||
// Use of this source code is governed by a MIT license
|
||||
// that can be found in the LICENSE file.
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/spf13/cast"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var migrateCmd = &cobra.Command{
|
||||
Use: "migrate",
|
||||
Short: fmt.Sprintf("Migrate all tasks to be compatible with asynq v%s", base.Version),
|
||||
Args: cobra.NoArgs,
|
||||
Run: migrate,
|
||||
}
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(migrateCmd)
|
||||
}
|
||||
|
||||
func migrate(cmd *cobra.Command, args []string) {
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: viper.GetString("uri"),
|
||||
DB: viper.GetInt("db"),
|
||||
Password: viper.GetString("password"),
|
||||
})
|
||||
r := createRDB()
|
||||
|
||||
/*** Migrate from 0.9 to 0.10, 0.11 compatible ***/
|
||||
lists := []string{"asynq:in_progress"}
|
||||
allQueues, err := c.SMembers(base.AllQueues).Result()
|
||||
if err != nil {
|
||||
printError(fmt.Errorf("could not read all queues: %v", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
lists = append(lists, allQueues...)
|
||||
for _, key := range lists {
|
||||
if err := migrateList(c, key); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
zsets := []string{"asynq:scheduled", "asynq:retry", "asynq:dead"}
|
||||
for _, key := range zsets {
|
||||
if err := migrateZSet(c, key); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
/*** Migrate from 0.11 to 0.12 compatible ***/
|
||||
if err := createBackup(c, base.AllQueues); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
for _, qkey := range allQueues {
|
||||
qname := strings.TrimPrefix(qkey, "asynq:queues:")
|
||||
if err := c.SAdd(base.AllQueues, qname).Err(); err != nil {
|
||||
err = fmt.Errorf("could not add queue name %q to %q set: %v\n",
|
||||
qname, base.AllQueues, err)
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := deleteBackup(c, base.AllQueues); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
for _, qkey := range allQueues {
|
||||
qname := strings.TrimPrefix(qkey, "asynq:queues:")
|
||||
if exists := c.Exists(qkey).Val(); exists == 1 {
|
||||
if err := c.Rename(qkey, base.QueueKey(qname)).Err(); err != nil {
|
||||
printError(fmt.Errorf("could not rename key %q: %v\n", qkey, err))
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:scheduled", base.ScheduledKey); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:retry", base.RetryKey); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Note: base.DeadKey function was renamed in v0.14. We define the legacy function here since we need it for this migration script.
|
||||
deadKeyFunc := func(qname string) string { return fmt.Sprintf("asynq:{%s}:dead", qname) }
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:dead", deadKeyFunc); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := partitionZSetMembersByQueue(c, "asynq:deadlines", base.DeadlinesKey); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := partitionListMembersByQueue(c, "asynq:in_progress", base.ActiveKey); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
paused, err := c.SMembers("asynq:paused").Result()
|
||||
if err != nil {
|
||||
printError(fmt.Errorf("command SMEMBERS asynq:paused failed: %v", err))
|
||||
os.Exit(1)
|
||||
}
|
||||
for _, qkey := range paused {
|
||||
qname := strings.TrimPrefix(qkey, "asynq:queues:")
|
||||
if err := r.Pause(qname); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := deleteKey(c, "asynq:paused"); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if err := deleteKey(c, "asynq:servers"); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := deleteKey(c, "asynq:workers"); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
/*** Migrate from 0.13 to 0.14 compatible ***/
|
||||
|
||||
// Move all dead tasks to archived ZSET.
|
||||
for _, qname := range allQueues {
|
||||
zs, err := c.ZRangeWithScores(deadKeyFunc(qname), 0, -1).Result()
|
||||
if err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
for _, z := range zs {
|
||||
if err := c.ZAdd(base.ArchivedKey(qname), &z).Err(); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
if err := deleteKey(c, deadKeyFunc(qname)); err != nil {
|
||||
printError(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func backupKey(key string) string {
|
||||
return fmt.Sprintf("%s:backup", key)
|
||||
}
|
||||
|
||||
func createBackup(c *redis.Client, key string) error {
|
||||
err := c.Rename(key, backupKey(key)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not rename key %q: %v", key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteBackup(c *redis.Client, key string) error {
|
||||
return deleteKey(c, backupKey(key))
|
||||
}
|
||||
|
||||
func deleteKey(c *redis.Client, key string) error {
|
||||
exists := c.Exists(key).Val()
|
||||
if exists == 0 {
|
||||
// key does not exist
|
||||
return nil
|
||||
}
|
||||
err := c.Del(key).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not delete key %q: %v", key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func printError(err error) {
|
||||
fmt.Println(err)
|
||||
fmt.Println()
|
||||
fmt.Println("Migrate command error")
|
||||
fmt.Println("Please file an issue on Github at https://github.com/hibiken/asynq/issues/new/choose")
|
||||
}
|
||||
|
||||
func partitionZSetMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error {
|
||||
zs, err := c.ZRangeWithScores(key, 0, -1).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command ZRANGE %s 0 -1 WITHSCORES failed: %v", key, err)
|
||||
}
|
||||
for _, z := range zs {
|
||||
s := cast.ToString(z.Member)
|
||||
msg, err := base.DecodeMessage(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not decode message from %q: %v", key, err)
|
||||
}
|
||||
if err := c.ZAdd(newKeyFunc(msg.Queue), &z).Err(); err != nil {
|
||||
return fmt.Errorf("could not add %v to %q: %v", z, newKeyFunc(msg.Queue))
|
||||
}
|
||||
}
|
||||
if err := deleteKey(c, key); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func partitionListMembersByQueue(c *redis.Client, key string, newKeyFunc func(string) string) error {
|
||||
data, err := c.LRange(key, 0, -1).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command LRANGE %s 0 -1 failed: %v", key, err)
|
||||
}
|
||||
for _, s := range data {
|
||||
msg, err := base.DecodeMessage(s)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not decode message from %q: %v", key, err)
|
||||
}
|
||||
if err := c.LPush(newKeyFunc(msg.Queue), s).Err(); err != nil {
|
||||
return fmt.Errorf("could not add %v to %q: %v", s, newKeyFunc(msg.Queue))
|
||||
}
|
||||
}
|
||||
if err := deleteKey(c, key); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type oldTaskMessage struct {
|
||||
// Unchanged
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
ID uuid.UUID
|
||||
Queue string
|
||||
Retry int
|
||||
Retried int
|
||||
ErrorMsg string
|
||||
UniqueKey string
|
||||
|
||||
// Following fields have changed.
|
||||
|
||||
// Deadline specifies the deadline for the task.
|
||||
// Task won't be processed if it exceeded its deadline.
|
||||
// The string shoulbe be in RFC3339 format.
|
||||
//
|
||||
// time.Time's zero value means no deadline.
|
||||
Timeout string
|
||||
|
||||
// Deadline specifies the deadline for the task.
|
||||
// Task won't be processed if it exceeded its deadline.
|
||||
// The string shoulbe be in RFC3339 format.
|
||||
//
|
||||
// time.Time's zero value means no deadline.
|
||||
Deadline string
|
||||
}
|
||||
|
||||
var defaultTimeout = 30 * time.Minute
|
||||
|
||||
func convertMessage(old *oldTaskMessage) (*base.TaskMessage, error) {
|
||||
timeout, err := time.ParseDuration(old.Timeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse Timeout field of %+v", old)
|
||||
}
|
||||
deadline, err := time.Parse(time.RFC3339, old.Deadline)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not parse Deadline field of %+v", old)
|
||||
}
|
||||
if timeout == 0 && deadline.IsZero() {
|
||||
timeout = defaultTimeout
|
||||
}
|
||||
if deadline.IsZero() {
|
||||
// Zero value used to be time.Time{},
|
||||
// in the new schema zero value is represented by
|
||||
// zero in Unix time.
|
||||
deadline = time.Unix(0, 0)
|
||||
}
|
||||
return &base.TaskMessage{
|
||||
Type: old.Type,
|
||||
Payload: old.Payload,
|
||||
ID: uuid.New(),
|
||||
Queue: old.Queue,
|
||||
Retry: old.Retry,
|
||||
Retried: old.Retried,
|
||||
ErrorMsg: old.ErrorMsg,
|
||||
UniqueKey: old.UniqueKey,
|
||||
Timeout: int64(timeout.Seconds()),
|
||||
Deadline: deadline.Unix(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func deserialize(s string) (*base.TaskMessage, error) {
|
||||
// Try deserializing as old message.
|
||||
d := json.NewDecoder(strings.NewReader(s))
|
||||
d.UseNumber()
|
||||
var old *oldTaskMessage
|
||||
if err := d.Decode(&old); err != nil {
|
||||
// Try deserializing as new message.
|
||||
d = json.NewDecoder(strings.NewReader(s))
|
||||
d.UseNumber()
|
||||
var msg *base.TaskMessage
|
||||
if err := d.Decode(&msg); err != nil {
|
||||
return nil, fmt.Errorf("could not deserialize %s into task message: %v", s, err)
|
||||
}
|
||||
return msg, nil
|
||||
}
|
||||
return convertMessage(old)
|
||||
}
|
||||
|
||||
func migrateZSet(c *redis.Client, key string) error {
|
||||
if c.Exists(key).Val() == 0 {
|
||||
// skip if key doesn't exist.
|
||||
return nil
|
||||
}
|
||||
res, err := c.ZRangeWithScores(key, 0, -1).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var msgs []*redis.Z
|
||||
for _, z := range res {
|
||||
s, err := cast.ToStringE(z.Member)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not cast to string: %v", err)
|
||||
}
|
||||
msg, err := deserialize(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encoded, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode message from %q: %v", key, err)
|
||||
}
|
||||
msgs = append(msgs, &redis.Z{Score: z.Score, Member: encoded})
|
||||
}
|
||||
if err := c.Rename(key, key+":backup").Err(); err != nil {
|
||||
return fmt.Errorf("could not rename key %q: %v", key, err)
|
||||
}
|
||||
if err := c.ZAdd(key, msgs...).Err(); err != nil {
|
||||
return fmt.Errorf("could not write new messages to %q: %v", key, err)
|
||||
}
|
||||
if err := c.Del(key + ":backup").Err(); err != nil {
|
||||
return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrateList(c *redis.Client, key string) error {
|
||||
if c.Exists(key).Val() == 0 {
|
||||
// skip if key doesn't exist.
|
||||
return nil
|
||||
}
|
||||
res, err := c.LRange(key, 0, -1).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var msgs []interface{}
|
||||
for _, s := range res {
|
||||
msg, err := deserialize(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
encoded, err := base.EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode message from %q: %v", key, err)
|
||||
}
|
||||
msgs = append(msgs, encoded)
|
||||
}
|
||||
if err := c.Rename(key, key+":backup").Err(); err != nil {
|
||||
return fmt.Errorf("could not rename key %q: %v", key, err)
|
||||
}
|
||||
if err := c.LPush(key, msgs...).Err(); err != nil {
|
||||
return fmt.Errorf("could not write new messages to %q: %v", key, err)
|
||||
}
|
||||
if err := c.Del(key + ":backup").Err(); err != nil {
|
||||
return fmt.Errorf("could not delete back up key %q: %v", key+":backup", err)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -10,8 +10,8 @@ import (
|
||||
"os"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/hibiken/asynq/inspeq"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -82,7 +82,7 @@ func queueList(cmd *cobra.Command, args []string) {
|
||||
type queueInfo struct {
|
||||
name string
|
||||
keyslot int64
|
||||
nodes []inspeq.ClusterNode
|
||||
nodes []*asynq.ClusterNode
|
||||
}
|
||||
inspector := createInspector()
|
||||
queues, err := inspector.Queues()
|
||||
@@ -132,16 +132,16 @@ func queueInspect(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("\n%s\n", separator)
|
||||
}
|
||||
fmt.Println()
|
||||
stats, err := inspector.CurrentStats(qname)
|
||||
info, err := inspector.GetQueueInfo(qname)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
continue
|
||||
}
|
||||
printQueueStats(stats)
|
||||
printQueueInfo(info)
|
||||
}
|
||||
}
|
||||
|
||||
func printQueueStats(s *inspeq.QueueStats) {
|
||||
func printQueueInfo(s *asynq.QueueInfo) {
|
||||
bold := color.New(color.Bold)
|
||||
bold.Println("Queue Info")
|
||||
fmt.Printf("Name: %s\n", s.Queue)
|
||||
@@ -191,7 +191,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func printDailyStats(stats []*inspeq.DailyStats) {
|
||||
func printDailyStats(stats []*asynq.DailyStats) {
|
||||
printTable(
|
||||
[]string{"date (UTC)", "processed", "failed", "error rate"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
@@ -244,7 +244,7 @@ func queueRemove(cmd *cobra.Command, args []string) {
|
||||
for _, qname := range args {
|
||||
err = r.RemoveQueue(qname, force)
|
||||
if err != nil {
|
||||
if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
|
||||
if errors.IsQueueNotEmpty(err) {
|
||||
fmt.Printf("error: %v\nIf you are sure you want to delete it, run 'asynq queue rm --force %s'\n", err, qname)
|
||||
continue
|
||||
}
|
||||
|
@@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/hibiken/asynq/inspeq"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -136,7 +135,7 @@ func createRDB() *rdb.RDB {
|
||||
}
|
||||
|
||||
// createRDB creates a Inspector instance using flag values and returns it.
|
||||
func createInspector() *inspeq.Inspector {
|
||||
func createInspector() *asynq.Inspector {
|
||||
var connOpt asynq.RedisConnOpt
|
||||
if useRedisCluster {
|
||||
addrs := strings.Split(viper.GetString("cluster_addrs"), ",")
|
||||
@@ -153,7 +152,7 @@ func createInspector() *inspeq.Inspector {
|
||||
TLSConfig: getTLSConfig(),
|
||||
}
|
||||
}
|
||||
return inspeq.New(connOpt)
|
||||
return asynq.NewInspector(connOpt)
|
||||
}
|
||||
|
||||
func getTLSConfig() *tls.Config {
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/inspeq"
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@@ -28,21 +28,21 @@ func init() {
|
||||
|
||||
taskCmd.AddCommand(taskArchiveCmd)
|
||||
taskArchiveCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
taskArchiveCmd.Flags().StringP("key", "k", "", "key of the task")
|
||||
taskArchiveCmd.Flags().StringP("id", "t", "", "id of the task")
|
||||
taskArchiveCmd.MarkFlagRequired("queue")
|
||||
taskArchiveCmd.MarkFlagRequired("key")
|
||||
taskArchiveCmd.MarkFlagRequired("id")
|
||||
|
||||
taskCmd.AddCommand(taskDeleteCmd)
|
||||
taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
taskDeleteCmd.Flags().StringP("key", "k", "", "key of the task")
|
||||
taskDeleteCmd.Flags().StringP("id", "t", "", "id of the task")
|
||||
taskDeleteCmd.MarkFlagRequired("queue")
|
||||
taskDeleteCmd.MarkFlagRequired("key")
|
||||
taskDeleteCmd.MarkFlagRequired("id")
|
||||
|
||||
taskCmd.AddCommand(taskRunCmd)
|
||||
taskRunCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs")
|
||||
taskRunCmd.Flags().StringP("key", "k", "", "key of the task")
|
||||
taskRunCmd.Flags().StringP("id", "t", "", "id of the task")
|
||||
taskRunCmd.MarkFlagRequired("queue")
|
||||
taskRunCmd.MarkFlagRequired("key")
|
||||
taskRunCmd.MarkFlagRequired("id")
|
||||
|
||||
taskCmd.AddCommand(taskArchiveAllCmd)
|
||||
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong")
|
||||
@@ -101,22 +101,22 @@ var taskCancelCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
var taskArchiveCmd = &cobra.Command{
|
||||
Use: "archive --queue=QUEUE --key=KEY",
|
||||
Short: "Archive a task with the given key",
|
||||
Use: "archive --queue=QUEUE --id=TASK_ID",
|
||||
Short: "Archive a task with the given id",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskArchive,
|
||||
}
|
||||
|
||||
var taskDeleteCmd = &cobra.Command{
|
||||
Use: "delete --queue=QUEUE --key=KEY",
|
||||
Short: "Delete a task with the given key",
|
||||
Use: "delete --queue=QUEUE --id=TASK_ID",
|
||||
Short: "Delete a task with the given id",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskDelete,
|
||||
}
|
||||
|
||||
var taskRunCmd = &cobra.Command{
|
||||
Use: "run --queue=QUEUE --key=KEY",
|
||||
Short: "Run a task with the given key",
|
||||
Use: "run --queue=QUEUE --id=TASK_ID",
|
||||
Short: "Run a task with the given id",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskRun,
|
||||
}
|
||||
@@ -129,14 +129,14 @@ var taskArchiveAllCmd = &cobra.Command{
|
||||
}
|
||||
|
||||
var taskDeleteAllCmd = &cobra.Command{
|
||||
Use: "delete-all --queue=QUEUE --key=KEY",
|
||||
Use: "delete-all --queue=QUEUE --state=STATE",
|
||||
Short: "Delete all tasks in the given state",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskDeleteAll,
|
||||
}
|
||||
|
||||
var taskRunAllCmd = &cobra.Command{
|
||||
Use: "run-all --queue=QUEUE --key=KEY",
|
||||
Use: "run-all --queue=QUEUE --state=STATE",
|
||||
Short: "Run all tasks in the given state",
|
||||
Args: cobra.NoArgs,
|
||||
Run: taskRunAll,
|
||||
@@ -183,7 +183,7 @@ func taskList(cmd *cobra.Command, args []string) {
|
||||
|
||||
func listActiveTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListActiveTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -196,7 +196,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
|
||||
[]string{"ID", "Type", "Payload"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
|
||||
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload())
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -204,7 +204,7 @@ func listActiveTasks(qname string, pageNum, pageSize int) {
|
||||
|
||||
func listPendingTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListPendingTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
tasks, err := i.ListPendingTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -214,10 +214,10 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
[]string{"Key", "Type", "Payload"},
|
||||
[]string{"ID", "Type", "Payload"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload)
|
||||
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload())
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -225,7 +225,7 @@ func listPendingTasks(qname string, pageNum, pageSize int) {
|
||||
|
||||
func listScheduledTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListScheduledTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
tasks, err := i.ListScheduledTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -235,12 +235,12 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
[]string{"Key", "Type", "Payload", "Process In"},
|
||||
[]string{"ID", "Type", "Payload", "Process In"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
processIn := fmt.Sprintf("%.0f seconds",
|
||||
t.NextProcessAt.Sub(time.Now()).Seconds())
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn)
|
||||
t.NextProcessAt().Sub(time.Now()).Seconds())
|
||||
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), processIn)
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -248,7 +248,7 @@ func listScheduledTasks(qname string, pageNum, pageSize int) {
|
||||
|
||||
func listRetryTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListRetryTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
tasks, err := i.ListRetryTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -258,16 +258,16 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
[]string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"},
|
||||
[]string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
var nextRetry string
|
||||
if d := t.NextProcessAt.Sub(time.Now()); d > 0 {
|
||||
if d := t.NextProcessAt().Sub(time.Now()); d > 0 {
|
||||
nextRetry = fmt.Sprintf("in %v", d.Round(time.Second))
|
||||
} else {
|
||||
nextRetry = "right now"
|
||||
}
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.LastError, t.Retried, t.MaxRetry)
|
||||
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), nextRetry, t.LastErr(), t.Retried(), t.MaxRetry())
|
||||
}
|
||||
},
|
||||
)
|
||||
@@ -275,7 +275,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) {
|
||||
|
||||
func listArchivedTasks(qname string, pageNum, pageSize int) {
|
||||
i := createInspector()
|
||||
tasks, err := i.ListArchivedTasks(qname, inspeq.PageSize(pageSize), inspeq.Page(pageNum))
|
||||
tasks, err := i.ListArchivedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
os.Exit(1)
|
||||
@@ -285,19 +285,18 @@ func listArchivedTasks(qname string, pageNum, pageSize int) {
|
||||
return
|
||||
}
|
||||
printTable(
|
||||
[]string{"Key", "Type", "Payload", "Last Failed", "Last Error"},
|
||||
[]string{"ID", "Type", "Payload", "Last Failed", "Last Error"},
|
||||
func(w io.Writer, tmpl string) {
|
||||
for _, t := range tasks {
|
||||
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.LastError)
|
||||
fmt.Fprintf(w, tmpl, t.ID(), t.Type(), t.Payload(), t.LastFailedAt(), t.LastErr())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func taskCancel(cmd *cobra.Command, args []string) {
|
||||
r := createRDB()
|
||||
i := createInspector()
|
||||
for _, id := range args {
|
||||
err := r.PublishCancelation(id)
|
||||
if err != nil {
|
||||
if err := i.CancelProcessing(id); err != nil {
|
||||
fmt.Printf("error: could not send cancelation signal: %v\n", err)
|
||||
continue
|
||||
}
|
||||
@@ -311,14 +310,14 @@ func taskArchive(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
key, err := cmd.Flags().GetString("key")
|
||||
id, err := cmd.Flags().GetString("id")
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
i := createInspector()
|
||||
err = i.ArchiveTaskByKey(qname, key)
|
||||
err = i.ArchiveTask(qname, id)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
@@ -332,14 +331,14 @@ func taskDelete(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
key, err := cmd.Flags().GetString("key")
|
||||
id, err := cmd.Flags().GetString("id")
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
i := createInspector()
|
||||
err = i.DeleteTaskByKey(qname, key)
|
||||
err = i.DeleteTask(qname, id)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
@@ -353,14 +352,14 @@ func taskRun(cmd *cobra.Command, args []string) {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
key, err := cmd.Flags().GetString("key")
|
||||
id, err := cmd.Flags().GetString("id")
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
i := createInspector()
|
||||
err = i.RunTaskByKey(qname, key)
|
||||
err = i.RunTask(qname, id)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err)
|
||||
os.Exit(1)
|
||||
|
Reference in New Issue
Block a user