diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 11501ac..042f574 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -5,6 +5,7 @@ package rdb import ( + "encoding/json" "fmt" "strings" "time" @@ -673,80 +674,76 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { } return nil } +*/ // Note: Script also removes stale keys. -var listServersCmd = redis.NewScript(` -local res = {} +var listServerKeysCmd = redis.NewScript(` local now = tonumber(ARGV[1]) local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") -for _, key in ipairs(keys) do - local s = redis.call("GET", key) - if s then - table.insert(res, s) - end -end redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) -return res`) +return keys`) // ListServers returns the list of server info. func (r *RDB) ListServers() ([]*base.ServerInfo, error) { - res, err := listServersCmd.Run(r.client, - []string{base.AllServers}, time.Now().UTC().Unix()).Result() + now := time.Now().UTC() + res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result() if err != nil { return nil, err } - data, err := cast.ToStringSliceE(res) + keys, err := cast.ToStringSliceE(res) if err != nil { return nil, err } var servers []*base.ServerInfo - for _, s := range data { - var info base.ServerInfo - err := json.Unmarshal([]byte(s), &info) + for _, key := range keys { + data, err := r.client.Get(key).Result() if err != nil { continue // skip bad data } + var info base.ServerInfo + if err := json.Unmarshal([]byte(data), &info); err != nil { + continue // skip bad data + } servers = append(servers, &info) } return servers, nil } // Note: Script also removes stale keys. -var listWorkersCmd = redis.NewScript(` -local res = {} +var listWorkerKeysCmd = redis.NewScript(` local now = tonumber(ARGV[1]) local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") -for _, key in ipairs(keys) do - local workers = redis.call("HVALS", key) - for _, w in ipairs(workers) do - table.insert(res, w) - end -end redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) -return res`) +return keys`) // ListWorkers returns the list of worker stats. func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { - res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, time.Now().UTC().Unix()).Result() + now := time.Now().UTC() + res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() if err != nil { return nil, err } - data, err := cast.ToStringSliceE(res) + keys, err := cast.ToStringSliceE(res) if err != nil { return nil, err } var workers []*base.WorkerInfo - for _, s := range data { - var w base.WorkerInfo - err := json.Unmarshal([]byte(s), &w) + for _, key := range keys { + data, err := r.client.HVals(key).Result() if err != nil { continue // skip bad data } - workers = append(workers, &w) + for _, s := range data { + var w base.WorkerInfo + if err := json.Unmarshal([]byte(s), &w); err != nil { + continue // skip bad data + } + workers = append(workers, &w) + + } } return workers, nil } -*/ // Pause pauses processing of tasks from the given queue. func (r *RDB) Pause(qname string) error { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 2be8138..318bdd0 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2717,6 +2717,7 @@ func TestRemoveQueueError(t *testing.T) { } } } +*/ func TestListServers(t *testing.T) { r := setup(t) @@ -2823,7 +2824,6 @@ func TestListWorkers(t *testing.T) { } } } -*/ func TestPause(t *testing.T) { r := setup(t)