mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Merge pull request #11 from hibiken/feature/enqall
Add enqall command to asynqmon CLI
This commit is contained in:
		
							
								
								
									
										5
									
								
								asynq.go
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								asynq.go
									
									
									
									
									
								
							| @@ -5,7 +5,12 @@ import "github.com/go-redis/redis/v7" | ||||
| /* | ||||
| TODOs: | ||||
| - [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue | ||||
| - [P0] asynqmon del <taskID>, asynqmon delall <qname> | ||||
| - [P0] asynqmon kill <taskID>, asynqmon killall <qname> | ||||
| - [P0] Redis Memory Usage, Connection info in stats | ||||
| - [P0] Processed, Failed count for today | ||||
| - [P0] Go docs + CONTRIBUTION.md + Github issue template | ||||
| - [P0] Redis Sentinel support | ||||
| - [P1] Add Support for multiple queues and priority | ||||
| - [P1] User defined max-retry count | ||||
| */ | ||||
|   | ||||
| @@ -272,6 +272,24 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // EnqueueAllScheduledTasks enqueues all tasks from scheduled queue | ||||
| // and returns the number of tasks enqueued. | ||||
| func (r *RDB) EnqueueAllScheduledTasks() (int64, error) { | ||||
| 	return r.removeAndEnqueueAll(scheduledQ) | ||||
| } | ||||
|  | ||||
| // EnqueueAllRetryTasks enqueues all tasks from retry queue | ||||
| // and returns the number of tasks enqueued. | ||||
| func (r *RDB) EnqueueAllRetryTasks() (int64, error) { | ||||
| 	return r.removeAndEnqueueAll(retryQ) | ||||
| } | ||||
|  | ||||
| // EnqueueAllDeadTasks enqueues all tasks from dead queue | ||||
| // and returns the number of tasks enqueued. | ||||
| func (r *RDB) EnqueueAllDeadTasks() (int64, error) { | ||||
| 	return r.removeAndEnqueueAll(deadQ) | ||||
| } | ||||
|  | ||||
| func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { | ||||
| 	script := redis.NewScript(` | ||||
| 	local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) | ||||
| @@ -295,3 +313,23 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { | ||||
| 	} | ||||
| 	return n, nil | ||||
| } | ||||
|  | ||||
| func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { | ||||
| 	script := redis.NewScript(` | ||||
| 	local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) | ||||
| 	for _, msg in ipairs(msgs) do | ||||
| 		redis.call("ZREM", KEYS[1], msg) | ||||
| 		redis.call("LPUSH", KEYS[2], msg) | ||||
| 	end | ||||
| 	return table.getn(msgs) | ||||
| 	`) | ||||
| 	res, err := script.Run(r.client, []string{zset, defaultQ}).Result() | ||||
| 	if err != nil { | ||||
| 		return 0, err | ||||
| 	} | ||||
| 	n, ok := res.(int64) | ||||
| 	if !ok { | ||||
| 		return 0, fmt.Errorf("could not cast %v to int64", res) | ||||
| 	} | ||||
| 	return n, nil | ||||
| } | ||||
|   | ||||
| @@ -698,3 +698,186 @@ func TestEnqueueScheduledTask(t *testing.T) { | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestEnqueueAllScheduledTasks(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	t1 := randomTask("send_email", "default", nil) | ||||
| 	t2 := randomTask("gen_thumbnail", "default", nil) | ||||
| 	t3 := randomTask("reindex", "default", nil) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		description  string | ||||
| 		scheduled    []*TaskMessage | ||||
| 		want         int64 | ||||
| 		wantEnqueued []*TaskMessage | ||||
| 	}{ | ||||
| 		{ | ||||
| 			description:  "with tasks in scheduled queue", | ||||
| 			scheduled:    []*TaskMessage{t1, t2, t3}, | ||||
| 			want:         3, | ||||
| 			wantEnqueued: []*TaskMessage{t1, t2, t3}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description:  "with empty scheduled queue", | ||||
| 			scheduled:    []*TaskMessage{}, | ||||
| 			want:         0, | ||||
| 			wantEnqueued: []*TaskMessage{}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		// clean up db before each test case. | ||||
| 		if err := r.client.FlushDB().Err(); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		// initialize scheduled queue | ||||
| 		for _, msg := range tc.scheduled { | ||||
| 			err := r.client.ZAdd(scheduledQ, &redis.Z{ | ||||
| 				Member: mustMarshal(t, msg), | ||||
| 				Score:  float64(time.Now().Add(time.Hour).Unix())}).Err() | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		got, err := r.EnqueueAllScheduledTasks() | ||||
| 		if err != nil { | ||||
| 			t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 		} | ||||
|  | ||||
| 		gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() | ||||
| 		gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) | ||||
| 		if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { | ||||
| 			t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestEnqueueAllRetryTasks(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	t1 := randomTask("send_email", "default", nil) | ||||
| 	t2 := randomTask("gen_thumbnail", "default", nil) | ||||
| 	t3 := randomTask("reindex", "default", nil) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		description  string | ||||
| 		retry        []*TaskMessage | ||||
| 		want         int64 | ||||
| 		wantEnqueued []*TaskMessage | ||||
| 	}{ | ||||
| 		{ | ||||
| 			description:  "with tasks in retry queue", | ||||
| 			retry:        []*TaskMessage{t1, t2, t3}, | ||||
| 			want:         3, | ||||
| 			wantEnqueued: []*TaskMessage{t1, t2, t3}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description:  "with empty retry queue", | ||||
| 			retry:        []*TaskMessage{}, | ||||
| 			want:         0, | ||||
| 			wantEnqueued: []*TaskMessage{}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		// clean up db before each test case. | ||||
| 		if err := r.client.FlushDB().Err(); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		// initialize retry queue | ||||
| 		for _, msg := range tc.retry { | ||||
| 			err := r.client.ZAdd(retryQ, &redis.Z{ | ||||
| 				Member: mustMarshal(t, msg), | ||||
| 				Score:  float64(time.Now().Add(time.Hour).Unix())}).Err() | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		got, err := r.EnqueueAllRetryTasks() | ||||
| 		if err != nil { | ||||
| 			t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("%s; r.EnqueueAllRetryTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 		} | ||||
|  | ||||
| 		gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() | ||||
| 		gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) | ||||
| 		if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { | ||||
| 			t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestEnqueueAllDeadTasks(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	t1 := randomTask("send_email", "default", nil) | ||||
| 	t2 := randomTask("gen_thumbnail", "default", nil) | ||||
| 	t3 := randomTask("reindex", "default", nil) | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		description  string | ||||
| 		dead         []*TaskMessage | ||||
| 		want         int64 | ||||
| 		wantEnqueued []*TaskMessage | ||||
| 	}{ | ||||
| 		{ | ||||
| 			description:  "with tasks in dead queue", | ||||
| 			dead:         []*TaskMessage{t1, t2, t3}, | ||||
| 			want:         3, | ||||
| 			wantEnqueued: []*TaskMessage{t1, t2, t3}, | ||||
| 		}, | ||||
| 		{ | ||||
| 			description:  "with empty dead queue", | ||||
| 			dead:         []*TaskMessage{}, | ||||
| 			want:         0, | ||||
| 			wantEnqueued: []*TaskMessage{}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		// clean up db before each test case. | ||||
| 		if err := r.client.FlushDB().Err(); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 		// initialize dead queue | ||||
| 		for _, msg := range tc.dead { | ||||
| 			err := r.client.ZAdd(deadQ, &redis.Z{ | ||||
| 				Member: mustMarshal(t, msg), | ||||
| 				Score:  float64(time.Now().Add(time.Hour).Unix())}).Err() | ||||
| 			if err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		got, err := r.EnqueueAllDeadTasks() | ||||
| 		if err != nil { | ||||
| 			t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("%s; r.EnqueueAllDeadTasks = %v, %v; want %v, nil", | ||||
| 				tc.description, got, err, tc.want) | ||||
| 		} | ||||
|  | ||||
| 		gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() | ||||
| 		gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) | ||||
| 		if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { | ||||
| 			t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -11,9 +11,9 @@ import ( | ||||
|  | ||||
| // enqCmd represents the enq command | ||||
| var enqCmd = &cobra.Command{ | ||||
| 	Use:   "enq", | ||||
| 	Use:   "enq [task id]", | ||||
| 	Short: "Enqueues a task given an identifier", | ||||
| 	Long: `The enq command enqueues a task given an identifier. | ||||
| 	Long: `Enq (asynqmon enq) will enqueue a task given an identifier. | ||||
|  | ||||
| The command takes one argument which specifies the task to enqueue. | ||||
| The task should be in either scheduled, retry or dead queue. | ||||
|   | ||||
							
								
								
									
										69
									
								
								tools/asynqmon/cmd/enqall.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								tools/asynqmon/cmd/enqall.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | ||||
| package cmd | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"os" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var enqallValidArgs = []string{"scheduled", "retry", "dead"} | ||||
|  | ||||
| // enqallCmd represents the enqall command | ||||
| var enqallCmd = &cobra.Command{ | ||||
| 	Use:   "enqall [queue name]", | ||||
| 	Short: "Enqueues all tasks from the specified queue", | ||||
| 	Long: `Enqall (asynqmon enqall) will enqueue all tasks from the specified queue. | ||||
|  | ||||
| The argument should be one of "scheduled", "retry", or "dead". | ||||
|  | ||||
| The tasks enqueued by this command will be processed as soon as it | ||||
| gets dequeued by a processor. | ||||
|  | ||||
| Example: asynqmon enqall dead -> Enqueues all tasks from the dead queue`, | ||||
| 	ValidArgs: enqallValidArgs, | ||||
| 	Args:      cobra.ExactValidArgs(1), | ||||
| 	Run:       enqall, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	rootCmd.AddCommand(enqallCmd) | ||||
|  | ||||
| 	// Here you will define your flags and configuration settings. | ||||
|  | ||||
| 	// Cobra supports Persistent Flags which will work for this command | ||||
| 	// and all subcommands, e.g.: | ||||
| 	// enqallCmd.PersistentFlags().String("foo", "", "A help for foo") | ||||
|  | ||||
| 	// Cobra supports local flags which will only run when this command | ||||
| 	// is called directly, e.g.: | ||||
| 	// enqallCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") | ||||
| } | ||||
|  | ||||
| func enqall(cmd *cobra.Command, args []string) { | ||||
| 	c := redis.NewClient(&redis.Options{ | ||||
| 		Addr: uri, | ||||
| 		DB:   db, | ||||
| 	}) | ||||
| 	r := rdb.NewRDB(c) | ||||
| 	var n int64 | ||||
| 	var err error | ||||
| 	switch args[0] { | ||||
| 	case "scheduled": | ||||
| 		n, err = r.EnqueueAllScheduledTasks() | ||||
| 	case "retry": | ||||
| 		n, err = r.EnqueueAllRetryTasks() | ||||
| 	case "dead": | ||||
| 		n, err = r.EnqueueAllDeadTasks() | ||||
| 	default: | ||||
| 		fmt.Printf("error: `asynqmon enqall [queue name]` only accepts %v as the argument.\n", enqallValidArgs) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		fmt.Println(err) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	fmt.Printf("Enqueued %d tasks from %q queue\n", n, args[0]) | ||||
| } | ||||
| @@ -15,20 +15,20 @@ import ( | ||||
| 	"github.com/spf13/cobra" | ||||
| ) | ||||
|  | ||||
| var validArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} | ||||
| var lsValidArgs = []string{"enqueued", "inprogress", "scheduled", "retry", "dead"} | ||||
|  | ||||
| // lsCmd represents the ls command | ||||
| var lsCmd = &cobra.Command{ | ||||
| 	Use:   "ls", | ||||
| 	Use:   "ls [queue name]", | ||||
| 	Short: "Lists queue contents", | ||||
| 	Long: `The ls command lists all tasks from the specified queue in a table format. | ||||
| 	Long: `Ls (asynqmon ls) will list all tasks from the specified queue in a table format. | ||||
|  | ||||
| The command takes one argument which specifies the queue to inspect. The value | ||||
| of the argument should be one of "enqueued", "inprogress", "scheduled", | ||||
| "retry", or "dead". | ||||
|  | ||||
| Example: asynqmon ls dead`, | ||||
| 	ValidArgs: validArgs, | ||||
| 	ValidArgs: lsValidArgs, | ||||
| 	Args:      cobra.ExactValidArgs(1), | ||||
| 	Run:       ls, | ||||
| } | ||||
| @@ -65,8 +65,8 @@ func ls(cmd *cobra.Command, args []string) { | ||||
| 	case "dead": | ||||
| 		listDead(r) | ||||
| 	default: | ||||
| 		fmt.Printf("error: `asynqmon ls <queue>` only accepts %v as the argument.\n", validArgs) | ||||
| 		return | ||||
| 		fmt.Printf("error: `asynqmon ls [queue name]` only accepts %v as the argument.\n", lsValidArgs) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -50,6 +50,7 @@ func init() { | ||||
| } | ||||
|  | ||||
| // initConfig reads in config file and ENV variables if set. | ||||
| // TODO(hibiken): Remove this if not necessary. | ||||
| func initConfig() { | ||||
| 	if cfgFile != "" { | ||||
| 		// Use config file from the flag. | ||||
|   | ||||
| @@ -15,7 +15,7 @@ import ( | ||||
| var statsCmd = &cobra.Command{ | ||||
| 	Use:   "stats", | ||||
| 	Short: "Shows current state of the queues", | ||||
| 	Long: `The stats command shows the number of tasks in each queue at that instant. | ||||
| 	Long: `Stats (aysnqmon stats) will show the number of tasks in each queue at that instant. | ||||
|  | ||||
| To monitor the queues continuously, it's recommended that you run this | ||||
| command in conjunction with the watch command. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user