2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Simplify ListWorkers implementation

This commit is contained in:
Ken Hibino 2021-03-11 06:41:18 -08:00
parent ea456741b6
commit d95f487bef
2 changed files with 14 additions and 14 deletions

View File

@ -381,7 +381,6 @@ type WorkerInfo struct {
Host string
PID int
ServerID string
// TODO: Can we have Task field (*TaskMessage)?
ID string
Type string
Payload map[string]interface{}

View File

@ -1022,29 +1022,31 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
}
// Note: Script also removes stale keys.
var listWorkerKeysCmd = redis.NewScript(`
var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)
local res = {}
for _, key in ipairs(keys) do
local vals = redis.call("HVALS", key)
for _, v in ipairs(vals) do
table.insert(res, v)
end
end
return res`)
// ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
now := time.Now()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil {
return nil, err
}
keys, err := cast.ToStringSliceE(res)
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var workers []*base.WorkerInfo
for _, key := range keys {
data, err := r.client.HVals(key).Result()
if err != nil {
continue // skip bad data
}
for _, s := range data {
w, err := base.DecodeWorkerInfo([]byte(s))
if err != nil {
@ -1052,7 +1054,6 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
}
workers = append(workers, w)
}
}
return workers, nil
}