2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Update ListServers and ListWorkers methods in RDB

This commit is contained in:
Ken Hibino 2020-08-18 06:30:15 -07:00
parent 3f26122ac0
commit 2f226dfb84
2 changed files with 29 additions and 32 deletions

View File

@ -5,6 +5,7 @@
package rdb package rdb
import ( import (
"encoding/json"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -673,80 +674,76 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
} }
return nil return nil
} }
*/
// Note: Script also removes stale keys. // Note: Script also removes stale keys.
var listServersCmd = redis.NewScript(` var listServerKeysCmd = redis.NewScript(`
local res = {}
local now = tonumber(ARGV[1]) local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
for _, key in ipairs(keys) do
local s = redis.call("GET", key)
if s then
table.insert(res, s)
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res`) return keys`)
// ListServers returns the list of server info. // ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) { func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
res, err := listServersCmd.Run(r.client, now := time.Now().UTC()
[]string{base.AllServers}, time.Now().UTC().Unix()).Result() res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, err := cast.ToStringSliceE(res) keys, err := cast.ToStringSliceE(res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var servers []*base.ServerInfo var servers []*base.ServerInfo
for _, s := range data { for _, key := range keys {
var info base.ServerInfo data, err := r.client.Get(key).Result()
err := json.Unmarshal([]byte(s), &info)
if err != nil { if err != nil {
continue // skip bad data continue // skip bad data
} }
var info base.ServerInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
continue // skip bad data
}
servers = append(servers, &info) servers = append(servers, &info)
} }
return servers, nil return servers, nil
} }
// Note: Script also removes stale keys. // Note: Script also removes stale keys.
var listWorkersCmd = redis.NewScript(` var listWorkerKeysCmd = redis.NewScript(`
local res = {}
local now = tonumber(ARGV[1]) local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
for _, key in ipairs(keys) do
local workers = redis.call("HVALS", key)
for _, w in ipairs(workers) do
table.insert(res, w)
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res`) return keys`)
// ListWorkers returns the list of worker stats. // ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, time.Now().UTC().Unix()).Result() now := time.Now().UTC()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
data, err := cast.ToStringSliceE(res) keys, err := cast.ToStringSliceE(res)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var workers []*base.WorkerInfo var workers []*base.WorkerInfo
for _, s := range data { for _, key := range keys {
var w base.WorkerInfo data, err := r.client.HVals(key).Result()
err := json.Unmarshal([]byte(s), &w)
if err != nil { if err != nil {
continue // skip bad data continue // skip bad data
} }
workers = append(workers, &w) for _, s := range data {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(s), &w); err != nil {
continue // skip bad data
}
workers = append(workers, &w)
}
} }
return workers, nil return workers, nil
} }
*/
// Pause pauses processing of tasks from the given queue. // Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error { func (r *RDB) Pause(qname string) error {

View File

@ -2717,6 +2717,7 @@ func TestRemoveQueueError(t *testing.T) {
} }
} }
} }
*/
func TestListServers(t *testing.T) { func TestListServers(t *testing.T) {
r := setup(t) r := setup(t)
@ -2823,7 +2824,6 @@ func TestListWorkers(t *testing.T) {
} }
} }
} }
*/
func TestPause(t *testing.T) { func TestPause(t *testing.T) {
r := setup(t) r := setup(t)