From d95f487befda2153c5547388b99f4c68bb8423b1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 11 Mar 2021 06:41:18 -0800 Subject: [PATCH] Simplify ListWorkers implementation --- internal/base/base.go | 1 - internal/rdb/inspect.go | 27 ++++++++++++++------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index a5fcead..77d6dbb 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -381,7 +381,6 @@ type WorkerInfo struct { Host string PID int ServerID string - // TODO: Can we have Task field (*TaskMessage)? ID string Type string Payload map[string]interface{} diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 96cfa55..36effb1 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1022,36 +1022,37 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) { } // Note: Script also removes stale keys. -var listWorkerKeysCmd = redis.NewScript(` +var listWorkersCmd = redis.NewScript(` local now = tonumber(ARGV[1]) local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) -return keys`) +local res = {} +for _, key in ipairs(keys) do + local vals = redis.call("HVALS", key) + for _, v in ipairs(vals) do + table.insert(res, v) + end +end +return res`) // ListWorkers returns the list of worker stats. func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { now := time.Now() - res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() + res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() if err != nil { return nil, err } - keys, err := cast.ToStringSliceE(res) + data, err := cast.ToStringSliceE(res) if err != nil { return nil, err } var workers []*base.WorkerInfo - for _, key := range keys { - data, err := r.client.HVals(key).Result() + for _, s := range data { + w, err := base.DecodeWorkerInfo([]byte(s)) if err != nil { continue // skip bad data } - for _, s := range data { - w, err := base.DecodeWorkerInfo([]byte(s)) - if err != nil { - continue // skip bad data - } - workers = append(workers, w) - } + workers = append(workers, w) } return workers, nil }