2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Merge pull request #10 from hibiken/feature/enq

Add enq command to asynqmon CLI
This commit is contained in:
Ken Hibino
2019-12-09 20:43:17 -08:00
committed by GitHub
9 changed files with 200 additions and 81 deletions

View File

@@ -4,7 +4,7 @@ import "github.com/go-redis/redis/v7"
/*
TODOs:
- [P0] command to retry tasks from "retry", "dead" queue
- [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue
- [P0] Go docs + CONTRIBUTION.md + Github issue template
- [P1] Add Support for multiple queues and priority
- [P1] User defined max-retry count
@@ -24,6 +24,7 @@ type Task struct {
}
// RedisConfig specifies redis configurations.
// TODO(hibiken): Support more configuration.
type RedisConfig struct {
Addr string
Password string
@@ -32,10 +33,10 @@ type RedisConfig struct {
DB int
}
func newRedisClient(config *RedisConfig) *redis.Client {
func newRedisClient(cfg *RedisConfig) *redis.Client {
return redis.NewClient(&redis.Options{
Addr: config.Addr,
Password: config.Password,
DB: config.DB,
Addr: cfg.Addr,
Password: cfg.Password,
DB: cfg.DB,
})
}

View File

@@ -33,8 +33,8 @@ type Background struct {
// NewBackground returns a new Background with the specified number of workers
// given a redis configuration .
func NewBackground(numWorkers int, config *RedisConfig) *Background {
r := rdb.NewRDB(newRedisClient(config))
func NewBackground(numWorkers int, cfg *RedisConfig) *Background {
r := rdb.NewRDB(newRedisClient(cfg))
poller := newPoller(r, 5*time.Second)
processor := newProcessor(r, numWorkers, nil)
return &Background{

View File

@@ -9,7 +9,7 @@ import (
// A Client is responsible for scheduling tasks.
//
// A Client is used to register task that should be processed
// A Client is used to register tasks that should be processed
// immediately or some time in the future.
//
// Clients are safe for concurrent use by multiple goroutines.
@@ -18,14 +18,14 @@ type Client struct {
}
// NewClient and returns a new Client given a redis configuration.
func NewClient(config *RedisConfig) *Client {
r := rdb.NewRDB(newRedisClient(config))
func NewClient(cfg *RedisConfig) *Client {
r := rdb.NewRDB(newRedisClient(cfg))
return &Client{r}
}
// Process registers a task to be processed at the specified time.
//
// Process returns nil if the task was registered successfully,
// Process returns nil if the task is registered successfully,
// otherwise returns non-nil error.
func (c *Client) Process(task *Task, processAt time.Time) error {
msg := &rdb.TaskMessage{

View File

@@ -42,6 +42,7 @@ type ScheduledTask struct {
Type string
Payload map[string]interface{}
ProcessAt time.Time
Score int64
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
@@ -55,6 +56,7 @@ type RetryTask struct {
ErrorMsg string
Retried int
Retry int
Score int64
}
// DeadTask is a task in that has exhausted all retries.
@@ -65,6 +67,7 @@ type DeadTask struct {
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
Score int64
}
// CurrentStats returns a current state of the queues.
@@ -158,6 +161,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
Type: msg.Type,
Payload: msg.Payload,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
@@ -190,6 +194,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
Retry: msg.Retry,
Retried: msg.Retried,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
@@ -219,16 +224,17 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
}
return tasks, nil
}
// Rescue finds a task that matches the given id and score from dead queue
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) Rescue(id string, score float64) error {
n, err := r.removeAndEnqueue(deadQ, id, score)
func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(deadQ, id.String(), float64(score))
if err != nil {
return err
}
@@ -238,11 +244,11 @@ func (r *RDB) Rescue(id string, score float64) error {
return nil
}
// RetryNow finds a task that matches the given id and score from retry queue
// EnqueueRetryTask finds a task that matches the given id and score from retry queue
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) RetryNow(id string, score float64) error {
n, err := r.removeAndEnqueue(retryQ, id, score)
func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(retryQ, id.String(), float64(score))
if err != nil {
return err
}
@@ -252,11 +258,11 @@ func (r *RDB) RetryNow(id string, score float64) error {
return nil
}
// ProcessNow finds a task that matches the given id and score from scheduled queue
// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue
// and enqueues it for processing. If a task that matches the id and score does not
// exist, it returns ErrTaskNotFound.
func (r *RDB) ProcessNow(id string, score float64) error {
n, err := r.removeAndEnqueue(scheduledQ, id, score)
func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(scheduledQ, id.String(), float64(score))
if err != nil {
return err
}

View File

@@ -249,8 +249,8 @@ func TestListScheduled(t *testing.T) {
m2 := randomTask("reindex", "default", nil)
p1 := time.Now().Add(30 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1}
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2}
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()}
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()}
type scheduledEntry struct {
msg *TaskMessage
@@ -330,9 +330,11 @@ func TestListRetry(t *testing.T) {
p1 := time.Now().Add(5 * time.Minute)
p2 := time.Now().Add(24 * time.Hour)
t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload,
ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, Retry: m1.Retry}
ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried,
Retry: m1.Retry, Score: p1.Unix()}
t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload,
ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, Retry: m2.Retry}
ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried,
Retry: m2.Retry, Score: p2.Unix()}
type retryEntry struct {
msg *TaskMessage
@@ -407,8 +409,10 @@ func TestListDead(t *testing.T) {
}
f1 := time.Now().Add(-5 * time.Minute)
f2 := time.Now().Add(-24 * time.Hour)
t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, LastFailedAt: f1, ErrorMsg: m1.ErrorMsg}
t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, LastFailedAt: f2, ErrorMsg: m2.ErrorMsg}
t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload,
LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix()}
t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload,
LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix()}
type deadEntry struct {
msg *TaskMessage
@@ -467,22 +471,22 @@ func TestListDead(t *testing.T) {
var timeCmpOpt = EquateApproxTime(time.Second)
func TestRescue(t *testing.T) {
func TestEnqueueDeadTask(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("gen_thumbnail", "default", nil)
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
s2 := float64(time.Now().Add(-time.Hour).Unix())
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
type deadEntry struct {
msg *TaskMessage
score float64
score int64
}
tests := []struct {
dead []deadEntry
score float64
id string
want error // expected return value from calling Rescue
score int64
id uuid.UUID
want error // expected return value from calling EnqueueDeadTask
wantDead []*TaskMessage
wantEnqueued []*TaskMessage
}{
@@ -492,7 +496,7 @@ func TestRescue(t *testing.T) {
{t2, s2},
},
score: s2,
id: t2.ID.String(),
id: t2.ID,
want: nil,
wantDead: []*TaskMessage{t1},
wantEnqueued: []*TaskMessage{t2},
@@ -502,8 +506,8 @@ func TestRescue(t *testing.T) {
{t1, s1},
{t2, s2},
},
score: 123.0,
id: t2.ID.String(),
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantDead: []*TaskMessage{t1, t2},
wantEnqueued: []*TaskMessage{},
@@ -517,15 +521,15 @@ func TestRescue(t *testing.T) {
}
// initialize dead queue
for _, d := range tc.dead {
err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
if err != nil {
t.Fatal(err)
}
}
got := r.Rescue(tc.id, tc.score)
got := r.EnqueueDeadTask(tc.id, tc.score)
if got != tc.want {
t.Errorf("r.Rescue(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
t.Errorf("r.EnqueueDeadTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
continue
}
@@ -543,22 +547,22 @@ func TestRescue(t *testing.T) {
}
}
func TestRetryNow(t *testing.T) {
func TestEnqueueRetryTask(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("gen_thumbnail", "default", nil)
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
s2 := float64(time.Now().Add(-time.Hour).Unix())
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
type retryEntry struct {
msg *TaskMessage
score float64
score int64
}
tests := []struct {
dead []retryEntry
score float64
id string
want error // expected return value from calling RetryNow
score int64
id uuid.UUID
want error // expected return value from calling EnqueueRetryTask
wantRetry []*TaskMessage
wantEnqueued []*TaskMessage
}{
@@ -568,7 +572,7 @@ func TestRetryNow(t *testing.T) {
{t2, s2},
},
score: s2,
id: t2.ID.String(),
id: t2.ID,
want: nil,
wantRetry: []*TaskMessage{t1},
wantEnqueued: []*TaskMessage{t2},
@@ -578,8 +582,8 @@ func TestRetryNow(t *testing.T) {
{t1, s1},
{t2, s2},
},
score: 123.0,
id: t2.ID.String(),
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantRetry: []*TaskMessage{t1, t2},
wantEnqueued: []*TaskMessage{},
@@ -593,15 +597,15 @@ func TestRetryNow(t *testing.T) {
}
// initialize retry queue
for _, d := range tc.dead {
err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
if err != nil {
t.Fatal(err)
}
}
got := r.RetryNow(tc.id, tc.score)
got := r.EnqueueRetryTask(tc.id, tc.score)
if got != tc.want {
t.Errorf("r.RetryNow(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
continue
}
@@ -619,22 +623,22 @@ func TestRetryNow(t *testing.T) {
}
}
func TestProcessNow(t *testing.T) {
func TestEnqueueScheduledTask(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("gen_thumbnail", "default", nil)
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
s2 := float64(time.Now().Add(-time.Hour).Unix())
s1 := time.Now().Add(-5 * time.Minute).Unix()
s2 := time.Now().Add(-time.Hour).Unix()
type scheduledEntry struct {
msg *TaskMessage
score float64
score int64
}
tests := []struct {
dead []scheduledEntry
score float64
id string
want error // expected return value from calling ProcessNow
score int64
id uuid.UUID
want error // expected return value from calling EnqueueScheduledTask
wantScheduled []*TaskMessage
wantEnqueued []*TaskMessage
}{
@@ -644,7 +648,7 @@ func TestProcessNow(t *testing.T) {
{t2, s2},
},
score: s2,
id: t2.ID.String(),
id: t2.ID,
want: nil,
wantScheduled: []*TaskMessage{t1},
wantEnqueued: []*TaskMessage{t2},
@@ -654,8 +658,8 @@ func TestProcessNow(t *testing.T) {
{t1, s1},
{t2, s2},
},
score: 123.0,
id: t2.ID.String(),
score: 123,
id: t2.ID,
want: ErrTaskNotFound,
wantScheduled: []*TaskMessage{t1, t2},
wantEnqueued: []*TaskMessage{},
@@ -669,15 +673,15 @@ func TestProcessNow(t *testing.T) {
}
// initialize scheduled queue
for _, d := range tc.dead {
err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
if err != nil {
t.Fatal(err)
}
}
got := r.ProcessNow(tc.id, tc.score)
got := r.EnqueueScheduledTask(tc.id, tc.score)
if got != tc.want {
t.Errorf("r.RetryNow(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
continue
}

70
tools/asynqmon/cmd/enq.go Normal file
View File

@@ -0,0 +1,70 @@
package cmd
import (
"fmt"
"os"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
// enqCmd represents the enq command
var enqCmd = &cobra.Command{
Use: "enq",
Short: "Enqueues a task given an identifier",
Long: `The enq command enqueues a task given an identifier.
The command takes one argument which specifies the task to enqueue.
The task should be in either scheduled, retry or dead queue.
Identifier for a task should be obtained by running "asynqmon ls" command.
The task enqueued by this command will be processed as soon as the task
gets dequeued by a processor.
Example: asynqmon enq d:1575732274:b0415aa2-fd33-4b63-87c4-2f1a954ea4bf`,
Args: cobra.ExactArgs(1),
Run: enq,
}
func init() {
rootCmd.AddCommand(enqCmd)
// Here you will define your flags and configuration settings.
// Cobra supports Persistent Flags which will work for this command
// and all subcommands, e.g.:
// enqCmd.PersistentFlags().String("foo", "", "A help for foo")
// Cobra supports local flags which will only run when this command
// is called directly, e.g.:
// enqCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
}
func enq(cmd *cobra.Command, args []string) {
id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: uri,
DB: db,
}))
switch qtype {
case "s":
err = r.EnqueueScheduledTask(id, score)
case "r":
err = r.EnqueueRetryTask(id, score)
case "d":
err = r.EnqueueDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Successfully enqueued %v\n", args[0])
}

View File

@@ -3,13 +3,14 @@ package cmd
import (
"fmt"
"io"
"log"
"os"
"strconv"
"strings"
"text/tabwriter"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
)
@@ -19,7 +20,7 @@ var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"}
// lsCmd represents the ls command
var lsCmd = &cobra.Command{
Use: "ls",
Short: "lists queue contents",
Short: "Lists queue contents",
Long: `The ls command lists all tasks from the specified queue in a table format.
The command takes one argument which specifies the queue to inspect. The value
@@ -69,10 +70,42 @@ func ls(cmd *cobra.Command, args []string) {
}
}
// queryID returns an identifier used for "enq" command.
// score is the zset score and queryType should be one
// of "s", "r" or "d" (scheduled, retry, dead respectively).
func queryID(id uuid.UUID, score int64, qtype string) string {
const format = "%v:%v:%v"
return fmt.Sprintf(format, qtype, score, id)
}
// parseQueryID is a reverse operation of queryID function.
// It takes a queryID and return each part of id with proper
// type if valid, otherwise it reports an error.
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(queryID, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[2])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB) {
tasks, err := r.ListEnqueued()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No enqueued tasks")
@@ -90,7 +123,8 @@ func listEnqueued(r *rdb.RDB) {
func listInProgress(r *rdb.RDB) {
tasks, err := r.ListInProgress()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No in-progress tasks")
@@ -108,7 +142,8 @@ func listInProgress(r *rdb.RDB) {
func listScheduled(r *rdb.RDB) {
tasks, err := r.ListScheduled()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No scheduled tasks")
@@ -118,7 +153,7 @@ func listScheduled(r *rdb.RDB) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, processIn)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn)
}
}
printTable(cols, printRows)
@@ -127,7 +162,8 @@ func listScheduled(r *rdb.RDB) {
func listRetry(r *rdb.RDB) {
tasks, err := r.ListRetry()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No retry tasks")
@@ -137,7 +173,7 @@ func listRetry(r *rdb.RDB) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
}
}
printTable(cols, printRows)
@@ -146,7 +182,8 @@ func listRetry(r *rdb.RDB) {
func listDead(r *rdb.RDB) {
tasks, err := r.ListDead()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Println("No dead tasks")
@@ -155,7 +192,7 @@ func listDead(r *rdb.RDB) {
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
}
}
printTable(cols, printRows)

View File

@@ -21,9 +21,10 @@ var rootCmd = &cobra.Command{
Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package.
Asynqmon has a few subcommands to show the current state of the queues, while others were
used to make manual changes to the queues. Monitoring commands can be used in conjunction
with the "watch" command to continuously run the command at a certain interval.
Asynqmon has a few subcommands to query and mutate the current state of the queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval.
Example: watch -n 5 asynqmon stats`,
// Uncomment the following line if your bare application

View File

@@ -2,7 +2,6 @@ package cmd
import (
"fmt"
"log"
"os"
"strings"
"text/tabwriter"
@@ -15,7 +14,7 @@ import (
// statsCmd represents the stats command
var statsCmd = &cobra.Command{
Use: "stats",
Short: "shows current state of the queues",
Short: "Shows current state of the queues",
Long: `The stats command shows the number of tasks in each queue at that instant.
To monitor the queues continuously, it's recommended that you run this
@@ -49,7 +48,8 @@ func stats(cmd *cobra.Command, args []string) {
stats, err := r.CurrentStats()
if err != nil {
log.Fatal(err)
fmt.Println(err)
os.Exit(1)
}
printStats(stats)
fmt.Println()