From b086e88a47884c22cdbb520a645f6909fa13afa1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 12 Apr 2020 17:09:58 -0700 Subject: [PATCH] Rename ps command to servers --- heartbeat_test.go | 22 ++++++------ internal/rdb/inspect.go | 25 +++++++------- internal/rdb/inspect_test.go | 10 +++--- tools/asynq/cmd/{ps.go => servers.go} | 50 +++++++++++++-------------- 4 files changed, 53 insertions(+), 54 deletions(-) rename tools/asynq/cmd/{ps.go => servers.go} (62%) diff --git a/heartbeat_test.go b/heartbeat_test.go index a1b4127..75c9940 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -54,21 +54,21 @@ func TestHeartbeater(t *testing.T) { // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) - ps, err := rdbClient.ListProcesses() + ss, err := rdbClient.ListServers() if err != nil { - t.Errorf("could not read process status from redis: %v", err) + t.Errorf("could not read server info from redis: %v", err) hb.terminate() continue } - if len(ps) != 1 { - t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps)) + if len(ss) != 1 { + t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) hb.terminate() continue } - if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { - t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) + if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { + t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) hb.terminate() continue } @@ -80,21 +80,21 @@ func TestHeartbeater(t *testing.T) { time.Sleep(tc.interval * 2) want.Status = "stopped" - ps, err = rdbClient.ListProcesses() + ss, err = rdbClient.ListServers() if err != nil { t.Errorf("could not read process status from redis: %v", err) hb.terminate() continue } - if len(ps) != 1 { - t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps)) + if len(ss) != 1 { + t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) hb.terminate() continue } - if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { - t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) + if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { + t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) hb.terminate() continue } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 8fc8a8d..dd81cae 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -758,24 +758,23 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { return nil } -// TODO: Rename this to listServerInfo. // Note: Script also removes stale keys. -var listProcessesCmd = redis.NewScript(` +var listServersCmd = redis.NewScript(` local res = {} local now = tonumber(ARGV[1]) local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") for _, key in ipairs(keys) do - local ps = redis.call("GET", key) - if ps then - table.insert(res, ps) + local s = redis.call("GET", key) + if s then + table.insert(res, s) end end redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) return res`) -// ListProcesses returns the list of process statuses. -func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) { - res, err := listProcessesCmd.Run(r.client, +// ListServers returns the list of process statuses. +func (r *RDB) ListServers() ([]*base.ServerInfo, error) { + res, err := listServersCmd.Run(r.client, []string{base.AllServers}, time.Now().UTC().Unix()).Result() if err != nil { return nil, err @@ -784,16 +783,16 @@ func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) { if err != nil { return nil, err } - var processes []*base.ServerInfo + var servers []*base.ServerInfo for _, s := range data { - var ps base.ServerInfo - err := json.Unmarshal([]byte(s), &ps) + var info base.ServerInfo + err := json.Unmarshal([]byte(s), &info) if err != nil { continue // skip bad data } - processes = append(processes, &ps) + servers = append(servers, &info) } - return processes, nil + return servers, nil } // Note: Script also removes stale keys. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 5c50810..1a1e931 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2051,7 +2051,7 @@ func TestRemoveQueueError(t *testing.T) { } } -func TestListProcesses(t *testing.T) { +func TestListServers(t *testing.T) { r := setup(t) started1 := time.Now().Add(-time.Hour) @@ -2113,12 +2113,12 @@ func TestListProcesses(t *testing.T) { } } - got, err := r.ListProcesses() + got, err := r.ListServers() if err != nil { - t.Errorf("r.ListProcesses returned an error: %v", err) + t.Errorf("r.ListServers returned an error: %v", err) } if diff := cmp.Diff(tc.want, got, h.SortServerInfoOpt, ignoreOpt, ignoreFieldOpt); diff != "" { - t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s", + t.Errorf("r.ListServers returned %v, want %v; (-want,+got)\n%s", got, tc.serverStates, diff) } } @@ -2173,7 +2173,7 @@ func TestListWorkers(t *testing.T) { err := r.WriteServerState(ss, time.Minute) if err != nil { - t.Errorf("could not write process state to redis: %v", err) + t.Errorf("could not write server state to redis: %v", err) continue } diff --git a/tools/asynq/cmd/ps.go b/tools/asynq/cmd/servers.go similarity index 62% rename from tools/asynq/cmd/ps.go rename to tools/asynq/cmd/servers.go index 08f409c..9431b41 100644 --- a/tools/asynq/cmd/ps.go +++ b/tools/asynq/cmd/servers.go @@ -18,64 +18,64 @@ import ( "github.com/spf13/viper" ) -// psCmd represents the ps command -var psCmd = &cobra.Command{ - Use: "ps", - Short: "Shows all background worker processes", - Long: `Ps (asynq ps) will show all background worker processes -backed by the specified redis instance. +// serversCmd represents the servers command +var serversCmd = &cobra.Command{ + Use: "servers", + Short: "Shows all running worker servers", + Long: `Servers (asynq servers) will show all running worker servers +pulling tasks from the specified redis instance. -The command shows the following for each process: -* Host and PID of the process +The command shows the following for each server: +* Host and PID of the process in which the server is running * Number of active workers out of worker pool * Queue configuration -* State of the worker process ("running" | "stopped") -* Time the process was started +* State of the worker server ("running" | "quiet") +* Time the server was started -A "running" process is processing tasks in queues. -A "stopped" process is no longer processing new tasks.`, +A "running" server is pulling tasks from queues and processing them. +A "quiet" server is no longer pulling new tasks from queues`, Args: cobra.NoArgs, - Run: ps, + Run: servers, } func init() { - rootCmd.AddCommand(psCmd) + rootCmd.AddCommand(serversCmd) } -func ps(cmd *cobra.Command, args []string) { +func servers(cmd *cobra.Command, args []string) { r := rdb.NewRDB(redis.NewClient(&redis.Options{ Addr: viper.GetString("uri"), DB: viper.GetInt("db"), Password: viper.GetString("password"), })) - processes, err := r.ListProcesses() + servers, err := r.ListServers() if err != nil { fmt.Println(err) os.Exit(1) } - if len(processes) == 0 { - fmt.Println("No processes") + if len(servers) == 0 { + fmt.Println("No running servers") return } // sort by hostname and pid - sort.Slice(processes, func(i, j int) bool { - x, y := processes[i], processes[j] + sort.Slice(servers, func(i, j int) bool { + x, y := servers[i], servers[j] if x.Host != y.Host { return x.Host < y.Host } return x.PID < y.PID }) - // print processes + // print server info cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"} printRows := func(w io.Writer, tmpl string) { - for _, ps := range processes { + for _, info := range servers { fmt.Fprintf(w, tmpl, - ps.Host, ps.PID, ps.Status, - fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), - formatQueues(ps.Queues), timeAgo(ps.Started)) + info.Host, info.PID, info.Status, + fmt.Sprintf("%d/%d", info.ActiveWorkerCount, info.Concurrency), + formatQueues(info.Queues), timeAgo(info.Started)) } } printTable(cols, printRows)