2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Fix lua script for redis-cluster mode

This commit is contained in:
Ken Hibino 2021-06-13 15:03:46 -07:00
parent a0df047f71
commit e01c6379c8

View File

@ -942,7 +942,6 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
keys := []string{ keys := []string{
src, src,
dst, dst,
base.AllQueues,
} }
now := time.Now() now := time.Now()
argv := []interface{}{ argv := []interface{}{
@ -1107,15 +1106,11 @@ func (r *RDB) deleteAll(key, qname string) (int64, error) {
if err := r.checkQueueExists(qname); err != nil { if err := r.checkQueueExists(qname); err != nil {
return 0, err return 0, err
} }
keys := []string{
key,
base.AllQueues,
}
argv := []interface{}{ argv := []interface{}{
base.TaskKeyPrefix(qname), base.TaskKeyPrefix(qname),
qname, qname,
} }
res, err := deleteAllCmd.Run(r.client, keys, argv...).Result() res, err := deleteAllCmd.Run(r.client, []string{key}, argv...).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -1369,33 +1364,33 @@ var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1]) local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
local res = {} return keys`)
for _, key in ipairs(keys) do
local vals = redis.call("HVALS", key)
for _, v in ipairs(vals) do
table.insert(res, v)
end
end
return res`)
// ListWorkers returns the list of worker stats. // ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
var op errors.Op = "rdb.ListWorkers"
now := time.Now() now := time.Now()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, errors.E(op, errors.Unknown, err)
} }
data, err := cast.ToStringSliceE(res) keys, err := cast.ToStringSliceE(res)
if err != nil { if err != nil {
return nil, err return nil, errors.E(op, errors.Internal, fmt.Sprintf("unexpeced return value from Lua script: %v", res))
} }
var workers []*base.WorkerInfo var workers []*base.WorkerInfo
for _, s := range data { for _, key := range keys {
w, err := base.DecodeWorkerInfo([]byte(s)) data, err := r.client.HVals(key).Result()
if err != nil { if err != nil {
continue // skip bad data continue // skip bad data
} }
workers = append(workers, w) for _, s := range data {
w, err := base.DecodeWorkerInfo([]byte(s))
if err != nil {
continue // skip bad data
}
workers = append(workers, w)
}
} }
return workers, nil return workers, nil
} }