mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 15:52:18 +08:00
Add task enqueue command to cli (#918)
This commit is contained in:
parent
1e102a5392
commit
013190b824
@ -11,6 +11,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
|
"time"
|
||||||
"unicode"
|
"unicode"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
@ -369,6 +370,11 @@ func createRDB() *rdb.RDB {
|
|||||||
return rdb.NewRDB(c)
|
return rdb.NewRDB(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createClient creates a Client instance using flag values and returns it.
|
||||||
|
func createClient() *asynq.Client {
|
||||||
|
return asynq.NewClient(getRedisConnOpt())
|
||||||
|
}
|
||||||
|
|
||||||
// createInspector creates a Inspector instance using flag values and returns it.
|
// createInspector creates a Inspector instance using flag values and returns it.
|
||||||
func createInspector() *asynq.Inspector {
|
func createInspector() *asynq.Inspector {
|
||||||
return asynq.NewInspector(getRedisConnOpt())
|
return asynq.NewInspector(getRedisConnOpt())
|
||||||
@ -456,3 +462,37 @@ func isPrintable(data []byte) bool {
|
|||||||
}
|
}
|
||||||
return !isAllSpace
|
return !isAllSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper to turn a command line flag into a duration
|
||||||
|
func getDuration(cmd *cobra.Command, arg string) time.Duration {
|
||||||
|
durationStr, err := cmd.Flags().GetString(arg)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
duration, err := time.ParseDuration(durationStr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper to turn a command line flag into a time
|
||||||
|
func getTime(cmd *cobra.Command, arg string) time.Time {
|
||||||
|
timeStr, err := cmd.Flags().GetString(arg)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeVal, err := time.Parse(time.RFC3339, timeStr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return timeVal
|
||||||
|
}
|
||||||
|
@ -53,6 +53,24 @@ func init() {
|
|||||||
taskRunCmd.MarkFlagRequired("queue")
|
taskRunCmd.MarkFlagRequired("queue")
|
||||||
taskRunCmd.MarkFlagRequired("id")
|
taskRunCmd.MarkFlagRequired("id")
|
||||||
|
|
||||||
|
taskCmd.AddCommand(taskEnqueueCmd)
|
||||||
|
taskEnqueueCmd.Flags().StringP("type_name", "t", "", "type name to enqueue the task as (required)")
|
||||||
|
taskEnqueueCmd.Flags().StringP("payload", "l", "", "payload to enqueue (required)")
|
||||||
|
// The following are the various OptionTypes; if not specified we won't pass them so that composeOptions()
|
||||||
|
// can apply its own defaults
|
||||||
|
taskEnqueueCmd.Flags().Int("retry", 0, "maximum retries")
|
||||||
|
taskEnqueueCmd.Flags().String("queue", "", "queue to enqueue the task to")
|
||||||
|
taskEnqueueCmd.Flags().String("id", "", "id to enqueue the task as")
|
||||||
|
taskEnqueueCmd.Flags().String("timeout", "", "timeout for the task (how long it can run); must be parseable as a time.Duration")
|
||||||
|
taskEnqueueCmd.Flags().String("deadline", "", "deadline for the task; must be in RFC3339 format")
|
||||||
|
taskEnqueueCmd.Flags().String("unique", "", "unique period for the task (duration within which it is guaranteed to be unique); must be parseable as a time.Duration")
|
||||||
|
taskEnqueueCmd.Flags().String("process_at", "", "process at time for the task; must be in RFC3339 format")
|
||||||
|
taskEnqueueCmd.Flags().String("process_in", "", "process in window for the task; must be parseable as a time.Duration")
|
||||||
|
taskEnqueueCmd.Flags().String("retention", "", "retention window for the task; must be parseable as a time.Duration")
|
||||||
|
taskEnqueueCmd.Flags().String("group", "", "group for the task")
|
||||||
|
taskEnqueueCmd.MarkFlagRequired("type_name")
|
||||||
|
taskEnqueueCmd.MarkFlagRequired("payload")
|
||||||
|
|
||||||
taskCmd.AddCommand(taskArchiveAllCmd)
|
taskCmd.AddCommand(taskArchiveAllCmd)
|
||||||
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)")
|
taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong (required)")
|
||||||
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)")
|
taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks; one of { pending | aggregating | scheduled | retry } (required)")
|
||||||
@ -151,6 +169,16 @@ var taskRunCmd = &cobra.Command{
|
|||||||
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
$ asynq task run --queue=myqueue --id=f1720682-f5a6-4db1-8953-4f48ae541d0f`),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var taskEnqueueCmd = &cobra.Command{
|
||||||
|
Use: "enqueue --type_name=footype --payload=barpayload",
|
||||||
|
Short: "Enqueue a task",
|
||||||
|
Args: cobra.NoArgs,
|
||||||
|
Run: taskEnqueue,
|
||||||
|
Example: heredoc.Doc(`
|
||||||
|
$ asynq task enqueue -t footype -l barpayload
|
||||||
|
$ asynq task enqueue -t footask -l barpayload --retry 3 --id f1720682-f5a6-4db1-8953-4f48ae541d0f --queue bazqueue --timeout 100s --deadline 2024-12-14T01:23:45Z --unique 100s --process_at 2024-12-14T01:22:05Z --process_in 100s --retention 5h --group baygroup`),
|
||||||
|
}
|
||||||
|
|
||||||
var taskArchiveAllCmd = &cobra.Command{
|
var taskArchiveAllCmd = &cobra.Command{
|
||||||
Use: "archiveall --queue=<queue> --state=<state>",
|
Use: "archiveall --queue=<queue> --state=<state>",
|
||||||
Short: "Archive all tasks in the given state",
|
Short: "Archive all tasks in the given state",
|
||||||
@ -521,6 +549,95 @@ func taskRun(cmd *cobra.Command, args []string) {
|
|||||||
fmt.Println("task is now pending")
|
fmt.Println("task is now pending")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func taskEnqueue(cmd *cobra.Command, args []string) {
|
||||||
|
typeName, err := cmd.Flags().GetString("type_name")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, err := cmd.Flags().GetString("payload")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For all of the optional flags, we need to explicitly check whether they were set or
|
||||||
|
// not; for consistency we want to use the defaults set in composeOptions() rather than
|
||||||
|
// the ones in the flag definitions.
|
||||||
|
opts := []asynq.Option{}
|
||||||
|
if cmd.Flags().Changed("retry") {
|
||||||
|
retry, err := cmd.Flags().GetInt("retry")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.MaxRetry(retry))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("queue") {
|
||||||
|
queue, err := cmd.Flags().GetString("queue")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Queue(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("id") {
|
||||||
|
id, err := cmd.Flags().GetString("id")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.TaskID(id))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("timeout") {
|
||||||
|
opts = append(opts, asynq.Timeout(getDuration(cmd, "timeout")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("deadline") {
|
||||||
|
opts = append(opts, asynq.Deadline(getTime(cmd, "deadline")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("unique") {
|
||||||
|
opts = append(opts, asynq.Unique(getDuration(cmd, "unique")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("process_at") {
|
||||||
|
opts = append(opts, asynq.ProcessAt(getTime(cmd, "process_at")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("process_in") {
|
||||||
|
opts = append(opts, asynq.ProcessIn(getDuration(cmd, "process_in")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("retention") {
|
||||||
|
opts = append(opts, asynq.Retention(getDuration(cmd, "retention")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if cmd.Flags().Changed("group") {
|
||||||
|
group, err := cmd.Flags().GetString("group")
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
opts = append(opts, asynq.Group(group))
|
||||||
|
}
|
||||||
|
|
||||||
|
c := createClient()
|
||||||
|
task := asynq.NewTask(typeName, []byte(payload), opts...)
|
||||||
|
|
||||||
|
taskInfo, err := c.Enqueue(task)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("error: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Enqueued task %s to queue %s\n", taskInfo.ID, taskInfo.Queue)
|
||||||
|
}
|
||||||
|
|
||||||
func taskArchiveAll(cmd *cobra.Command, args []string) {
|
func taskArchiveAll(cmd *cobra.Command, args []string) {
|
||||||
qname, err := cmd.Flags().GetString("queue")
|
qname, err := cmd.Flags().GetString("queue")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -653,3 +770,4 @@ func taskRunAll(cmd *cobra.Command, args []string) {
|
|||||||
}
|
}
|
||||||
fmt.Printf("%d tasks are now pending\n", n)
|
fmt.Printf("%d tasks are now pending\n", n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user