mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 23:06:12 +08:00 
			
		
		
		
	Rename ps command to servers
This commit is contained in:
		| @@ -54,21 +54,21 @@ func TestHeartbeater(t *testing.T) { | |||||||
| 		// allow for heartbeater to write to redis | 		// allow for heartbeater to write to redis | ||||||
| 		time.Sleep(tc.interval * 2) | 		time.Sleep(tc.interval * 2) | ||||||
|  |  | ||||||
| 		ps, err := rdbClient.ListProcesses() | 		ss, err := rdbClient.ListServers() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("could not read process status from redis: %v", err) | 			t.Errorf("could not read server info from redis: %v", err) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if len(ps) != 1 { | 		if len(ss) != 1 { | ||||||
| 			t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps)) | 			t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | 		if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | ||||||
| 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) | 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| @@ -80,21 +80,21 @@ func TestHeartbeater(t *testing.T) { | |||||||
| 		time.Sleep(tc.interval * 2) | 		time.Sleep(tc.interval * 2) | ||||||
|  |  | ||||||
| 		want.Status = "stopped" | 		want.Status = "stopped" | ||||||
| 		ps, err = rdbClient.ListProcesses() | 		ss, err = rdbClient.ListServers() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("could not read process status from redis: %v", err) | 			t.Errorf("could not read process status from redis: %v", err) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if len(ps) != 1 { | 		if len(ss) != 1 { | ||||||
| 			t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps)) | 			t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | 		if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | ||||||
| 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) | 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) | ||||||
| 			hb.terminate() | 			hb.terminate() | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -758,24 +758,23 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO: Rename this to listServerInfo. |  | ||||||
| // Note: Script also removes stale keys. | // Note: Script also removes stale keys. | ||||||
| var listProcessesCmd = redis.NewScript(` | var listServersCmd = redis.NewScript(` | ||||||
| local res = {} | local res = {} | ||||||
| local now = tonumber(ARGV[1]) | local now = tonumber(ARGV[1]) | ||||||
| local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") | local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") | ||||||
| for _, key in ipairs(keys) do | for _, key in ipairs(keys) do | ||||||
| 	local ps = redis.call("GET", key) | 	local s = redis.call("GET", key) | ||||||
| 	if ps then | 	if s then | ||||||
| 		table.insert(res, ps) | 		table.insert(res, s) | ||||||
| 	end   | 	end   | ||||||
| end | end | ||||||
| redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) | redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) | ||||||
| return res`) | return res`) | ||||||
|  |  | ||||||
| // ListProcesses returns the list of process statuses. | // ListServers returns the list of process statuses. | ||||||
| func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) { | func (r *RDB) ListServers() ([]*base.ServerInfo, error) { | ||||||
| 	res, err := listProcessesCmd.Run(r.client, | 	res, err := listServersCmd.Run(r.client, | ||||||
| 		[]string{base.AllServers}, time.Now().UTC().Unix()).Result() | 		[]string{base.AllServers}, time.Now().UTC().Unix()).Result() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| @@ -784,16 +783,16 @@ func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	var processes []*base.ServerInfo | 	var servers []*base.ServerInfo | ||||||
| 	for _, s := range data { | 	for _, s := range data { | ||||||
| 		var ps base.ServerInfo | 		var info base.ServerInfo | ||||||
| 		err := json.Unmarshal([]byte(s), &ps) | 		err := json.Unmarshal([]byte(s), &info) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			continue // skip bad data | 			continue // skip bad data | ||||||
| 		} | 		} | ||||||
| 		processes = append(processes, &ps) | 		servers = append(servers, &info) | ||||||
| 	} | 	} | ||||||
| 	return processes, nil | 	return servers, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // Note: Script also removes stale keys. | // Note: Script also removes stale keys. | ||||||
|   | |||||||
| @@ -2051,7 +2051,7 @@ func TestRemoveQueueError(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func TestListProcesses(t *testing.T) { | func TestListServers(t *testing.T) { | ||||||
| 	r := setup(t) | 	r := setup(t) | ||||||
|  |  | ||||||
| 	started1 := time.Now().Add(-time.Hour) | 	started1 := time.Now().Add(-time.Hour) | ||||||
| @@ -2113,12 +2113,12 @@ func TestListProcesses(t *testing.T) { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		got, err := r.ListProcesses() | 		got, err := r.ListServers() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("r.ListProcesses returned an error: %v", err) | 			t.Errorf("r.ListServers returned an error: %v", err) | ||||||
| 		} | 		} | ||||||
| 		if diff := cmp.Diff(tc.want, got, h.SortServerInfoOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | 		if diff := cmp.Diff(tc.want, got, h.SortServerInfoOpt, ignoreOpt, ignoreFieldOpt); diff != "" { | ||||||
| 			t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s", | 			t.Errorf("r.ListServers returned %v, want %v; (-want,+got)\n%s", | ||||||
| 				got, tc.serverStates, diff) | 				got, tc.serverStates, diff) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| @@ -2173,7 +2173,7 @@ func TestListWorkers(t *testing.T) { | |||||||
|  |  | ||||||
| 		err := r.WriteServerState(ss, time.Minute) | 		err := r.WriteServerState(ss, time.Minute) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			t.Errorf("could not write process state to redis: %v", err) | 			t.Errorf("could not write server state to redis: %v", err) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -18,64 +18,64 @@ import ( | |||||||
| 	"github.com/spf13/viper" | 	"github.com/spf13/viper" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // psCmd represents the ps command | // serversCmd represents the servers command | ||||||
| var psCmd = &cobra.Command{ | var serversCmd = &cobra.Command{ | ||||||
| 	Use:   "ps", | 	Use:   "servers", | ||||||
| 	Short: "Shows all background worker processes", | 	Short: "Shows all running worker servers", | ||||||
| 	Long: `Ps (asynq ps) will show all background worker processes | 	Long: `Servers (asynq servers) will show all running worker servers | ||||||
| backed by the specified redis instance. | pulling tasks from the specified redis instance. | ||||||
| 
 | 
 | ||||||
| The command shows the following for each process: | The command shows the following for each server: | ||||||
| * Host and PID of the process | * Host and PID of the process in which the server is running | ||||||
| * Number of active workers out of worker pool | * Number of active workers out of worker pool | ||||||
| * Queue configuration | * Queue configuration | ||||||
| * State of the worker process ("running" | "stopped") | * State of the worker server ("running" | "quiet") | ||||||
| * Time the process was started | * Time the server was started | ||||||
| 
 | 
 | ||||||
| A "running" process is processing tasks in queues. | A "running" server is pulling tasks from queues and processing them. | ||||||
| A "stopped" process is no longer processing new tasks.`, | A "quiet" server is no longer pulling new tasks from queues`, | ||||||
| 	Args: cobra.NoArgs, | 	Args: cobra.NoArgs, | ||||||
| 	Run:  ps, | 	Run:  servers, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func init() { | func init() { | ||||||
| 	rootCmd.AddCommand(psCmd) | 	rootCmd.AddCommand(serversCmd) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func ps(cmd *cobra.Command, args []string) { | func servers(cmd *cobra.Command, args []string) { | ||||||
| 	r := rdb.NewRDB(redis.NewClient(&redis.Options{ | 	r := rdb.NewRDB(redis.NewClient(&redis.Options{ | ||||||
| 		Addr:     viper.GetString("uri"), | 		Addr:     viper.GetString("uri"), | ||||||
| 		DB:       viper.GetInt("db"), | 		DB:       viper.GetInt("db"), | ||||||
| 		Password: viper.GetString("password"), | 		Password: viper.GetString("password"), | ||||||
| 	})) | 	})) | ||||||
| 
 | 
 | ||||||
| 	processes, err := r.ListProcesses() | 	servers, err := r.ListServers() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		fmt.Println(err) | 		fmt.Println(err) | ||||||
| 		os.Exit(1) | 		os.Exit(1) | ||||||
| 	} | 	} | ||||||
| 	if len(processes) == 0 { | 	if len(servers) == 0 { | ||||||
| 		fmt.Println("No processes") | 		fmt.Println("No running servers") | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// sort by hostname and pid | 	// sort by hostname and pid | ||||||
| 	sort.Slice(processes, func(i, j int) bool { | 	sort.Slice(servers, func(i, j int) bool { | ||||||
| 		x, y := processes[i], processes[j] | 		x, y := servers[i], servers[j] | ||||||
| 		if x.Host != y.Host { | 		if x.Host != y.Host { | ||||||
| 			return x.Host < y.Host | 			return x.Host < y.Host | ||||||
| 		} | 		} | ||||||
| 		return x.PID < y.PID | 		return x.PID < y.PID | ||||||
| 	}) | 	}) | ||||||
| 
 | 
 | ||||||
| 	// print processes | 	// print server info | ||||||
| 	cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"} | 	cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"} | ||||||
| 	printRows := func(w io.Writer, tmpl string) { | 	printRows := func(w io.Writer, tmpl string) { | ||||||
| 		for _, ps := range processes { | 		for _, info := range servers { | ||||||
| 			fmt.Fprintf(w, tmpl, | 			fmt.Fprintf(w, tmpl, | ||||||
| 				ps.Host, ps.PID, ps.Status, | 				info.Host, info.PID, info.Status, | ||||||
| 				fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), | 				fmt.Sprintf("%d/%d", info.ActiveWorkerCount, info.Concurrency), | ||||||
| 				formatQueues(ps.Queues), timeAgo(ps.Started)) | 				formatQueues(info.Queues), timeAgo(info.Started)) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	printTable(cols, printRows) | 	printTable(cols, printRows) | ||||||
		Reference in New Issue
	
	Block a user