2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Add enqall command to asynqmon CLI

This commit is contained in:
Ken Hibino 2019-12-10 21:38:25 -08:00
parent 0d74c518bf
commit a96719413c
8 changed files with 131 additions and 24 deletions

View File

@ -16,7 +16,7 @@ TODOs:
*/ */
// Max retry count by default // Max retry count by default
const defaultMaxRetry = 1 const defaultMaxRetry = 25
// Task represents a task to be performed. // Task represents a task to be performed.
type Task struct { type Task struct {

View File

@ -273,17 +273,20 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
} }
// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue. // EnqueueAllScheduledTasks enqueues all tasks from scheduled queue.
func (r *RDB) EnqueueAllScheduledTasks() error { // and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllScheduledTasks() (int64, error) {
return r.removeAndEnqueueAll(scheduledQ) return r.removeAndEnqueueAll(scheduledQ)
} }
// EnqueueAllRetryTasks enqueues all tasks from retry queue. // EnqueueAllRetryTasks enqueues all tasks from retry queue.
func (r *RDB) EnqueueAllRetryTasks() error { // and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllRetryTasks() (int64, error) {
return r.removeAndEnqueueAll(retryQ) return r.removeAndEnqueueAll(retryQ)
} }
// EnqueueAllDeadTasks enqueues all tasks from dead queue. // EnqueueAllDeadTasks enqueues all tasks from dead queue
func (r *RDB) EnqueueAllDeadTasks() error { // and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllDeadTasks() (int64, error) {
return r.removeAndEnqueueAll(deadQ) return r.removeAndEnqueueAll(deadQ)
} }
@ -311,7 +314,7 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
return n, nil return n, nil
} }
func (r *RDB) removeAndEnqueueAll(zset string) error { func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
script := redis.NewScript(` script := redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
@ -320,9 +323,13 @@ func (r *RDB) removeAndEnqueueAll(zset string) error {
end end
return table.getn(msgs) return table.getn(msgs)
`) `)
_, err := script.Run(r.client, []string{zset, defaultQ}).Result() res, err := script.Run(r.client, []string{zset, defaultQ}).Result()
if err != nil { if err != nil {
return err return 0, err
} }
return nil n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
} }

View File

@ -708,16 +708,19 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
tests := []struct { tests := []struct {
description string description string
scheduled []*TaskMessage scheduled []*TaskMessage
want int64
wantEnqueued []*TaskMessage wantEnqueued []*TaskMessage
}{ }{
{ {
description: "with tasks in scheduled queue", description: "with tasks in scheduled queue",
scheduled: []*TaskMessage{t1, t2, t3}, scheduled: []*TaskMessage{t1, t2, t3},
want: 3,
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
{ {
description: "with empty scheduled queue", description: "with empty scheduled queue",
scheduled: []*TaskMessage{}, scheduled: []*TaskMessage{},
want: 0,
wantEnqueued: []*TaskMessage{}, wantEnqueued: []*TaskMessage{},
}, },
} }
@ -737,9 +740,16 @@ func TestEnqueueAllScheduledTasks(t *testing.T) {
} }
} }
err := r.EnqueueAllScheduledTasks() got, err := r.EnqueueAllScheduledTasks()
if err != nil { if err != nil {
t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, want nil", tc.description, err) t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
} }
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
@ -759,16 +769,19 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
tests := []struct { tests := []struct {
description string description string
retry []*TaskMessage retry []*TaskMessage
want int64
wantEnqueued []*TaskMessage wantEnqueued []*TaskMessage
}{ }{
{ {
description: "with tasks in retry queue", description: "with tasks in retry queue",
retry: []*TaskMessage{t1, t2, t3}, retry: []*TaskMessage{t1, t2, t3},
want: 3,
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
{ {
description: "with empty retry queue", description: "with empty retry queue",
retry: []*TaskMessage{}, retry: []*TaskMessage{},
want: 0,
wantEnqueued: []*TaskMessage{}, wantEnqueued: []*TaskMessage{},
}, },
} }
@ -788,9 +801,16 @@ func TestEnqueueAllRetryTasks(t *testing.T) {
} }
} }
err := r.EnqueueAllRetryTasks() got, err := r.EnqueueAllRetryTasks()
if err != nil { if err != nil {
t.Errorf("%s; r.EnqueueAllRetryTasks = %v, want nil", tc.description, err) t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
} }
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
@ -810,16 +830,19 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
tests := []struct { tests := []struct {
description string description string
dead []*TaskMessage dead []*TaskMessage
want int64
wantEnqueued []*TaskMessage wantEnqueued []*TaskMessage
}{ }{
{ {
description: "with tasks in dead queue", description: "with tasks in dead queue",
dead: []*TaskMessage{t1, t2, t3}, dead: []*TaskMessage{t1, t2, t3},
want: 3,
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
{ {
description: "with empty dead queue", description: "with empty dead queue",
dead: []*TaskMessage{}, dead: []*TaskMessage{},
want: 0,
wantEnqueued: []*TaskMessage{}, wantEnqueued: []*TaskMessage{},
}, },
} }
@ -839,9 +862,16 @@ func TestEnqueueAllDeadTasks(t *testing.T) {
} }
} }
err := r.EnqueueAllDeadTasks() got, err := r.EnqueueAllDeadTasks()
if err != nil { if err != nil {
t.Errorf("%s; r.EnqueueAllDeadTasks = %v, want nil", tc.description, err) t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
continue
}
if got != tc.want {
t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil",
tc.description, got, err, tc.want)
} }
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()

View File

@ -11,9 +11,9 @@ import (
// enqCmd represents the enq command // enqCmd represents the enq command
var enqCmd = &cobra.Command{ var enqCmd = &cobra.Command{
Use: "enq", Use: "enq [task id]",
Short: "Enqueues a task given an identifier", Short: "Enqueues a task given an identifier",
Long: `The enq command enqueues a task given an identifier. Long: `Enq (asynqmon enq) will enqueue a task given an identifier.
The command takes one argument which specifies the task to enqueue. The command takes one argument which specifies the task to enqueue.
The task should be in either scheduled, retry or dead queue. The task should be in either scheduled, retry or dead queue.

View File

@ -0,0 +1,69 @@
package cmd
import (
"fmt"
"os"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
var enqallValidArgs = []string{"scheduled", "retry", "dead"}
// enqallCmd represents the enqall command
var enqallCmd = &cobra.Command{
Use: "enqall [queue name]",
Short: "Enqueues all tasks from the specified queue",
Long: `Enqall (asynqmon enqall) will enqueue all tasks from the specified queue.
The argument should be one of "scheduled", "retry", or "dead".
The tasks enqueued by this command will be processed as soon as it
gets dequeued by a processor.
Example: asynqmon enqall dead -> Enqueues all tasks from the dead queue`,
ValidArgs: enqallValidArgs,
Args: cobra.ExactValidArgs(1),
Run: enqall,
}
func init() {
rootCmd.AddCommand(enqallCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// enqallCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// enqallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
func enqall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
})
r := rdb.NewRDB(c)
var n int64
var err error
switch args[0] {
case "scheduled":
n, err = r.EnqueueAllScheduledTasks()
case "retry":
n, err = r.EnqueueAllRetryTasks()
case "dead":
n, err = r.EnqueueAllDeadTasks()
default:
fmt.Printf("error: `asynqmon enqall <queue>` only accepts %v as the argument.\n", enqallValidArgs)
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Enqueued %d tasks from %q queue\n", n, args[0])
}

View File

@ -15,20 +15,20 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"}
// lsCmd represents the ls command // lsCmd represents the ls command
var lsCmd = &cobra.Command{ var lsCmd = &cobra.Command{
Use: "ls", Use: "ls [queue name]",
Short: "Lists queue contents", Short: "Lists queue contents",
Long: `The ls command lists all tasks from the specified queue in a table format. Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format.
The command takes one argument which specifies the queue to inspect. The value The command takes one argument which specifies the queue to inspect. The value
of the argument should be one of "enqueued", "inprogress", "scheduled", of the argument should be one of "enqueued", "inprogress", "scheduled",
"retry", or "dead". "retry", or "dead".
Example: asynqmon ls dead`, Example: asynqmon ls dead`,
ValidArgs: validArgs, ValidArgs: lsValidArgs,
Args: cobra.ExactValidArgs(1), Args: cobra.ExactValidArgs(1),
Run: ls, Run: ls,
} }
@ -65,8 +65,8 @@ func ls(cmd *cobra.Command, args []string) {
case "dead": case "dead":
listDead(r) listDead(r)
default: default:
fmt.Printf("error: `asynqmon ls <queue>` only accepts %v as the argument.\n", validArgs) fmt.Printf("error: `asynqmon ls <queue>` only accepts %v as the argument.\n", lsValidArgs)
return os.Exit(1)
} }
} }

View File

@ -50,6 +50,7 @@ func init() {
} }
// initConfig reads in config file and ENV variables if set. // initConfig reads in config file and ENV variables if set.
// TODO(hibiken): Remove this if not necessary.
func initConfig() { func initConfig() {
if cfgFile != "" { if cfgFile != "" {
// Use config file from the flag. // Use config file from the flag.

View File

@ -15,7 +15,7 @@ import (
var statsCmd = &cobra.Command{ var statsCmd = &cobra.Command{
Use: "stats", Use: "stats",
Short: "Shows current state of the queues", Short: "Shows current state of the queues",
Long: `The stats command shows the number of tasks in each queue at that instant. Long: `Stats (aysnqmon stats) will show the number of tasks in each queue at that instant.
To monitor the queues continuously, it's recommended that you run this To monitor the queues continuously, it's recommended that you run this
command in conjunction with the watch command. command in conjunction with the watch command.