mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 07:12:17 +08:00
Add ListWorkers to RDB
This commit is contained in:
parent
37c6c73d9b
commit
e21fe3bd79
@ -15,7 +15,7 @@ import (
|
|||||||
|
|
||||||
func TestClient(t *testing.T) {
|
func TestClient(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
client := NewClient(&RedisClientOpt{
|
client := NewClient(RedisClientOpt{
|
||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
DB: 14,
|
DB: 14,
|
||||||
})
|
})
|
||||||
|
@ -53,6 +53,15 @@ var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.Proc
|
|||||||
return out
|
return out
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// SortWorkerInfoOpt is a cmp.Option to sort base.WorkerInfo for comparing slice of worker info.
|
||||||
|
var SortWorkerInfoOpt = cmp.Transformer("SortWorkerInfo", func(in []*base.WorkerInfo) []*base.WorkerInfo {
|
||||||
|
out := append([]*base.WorkerInfo(nil), in...) // Copy input to avoid mutating it
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
return out[i].ID.String() < out[j].ID.String()
|
||||||
|
})
|
||||||
|
return out
|
||||||
|
})
|
||||||
|
|
||||||
// SortStringSliceOpt is a cmp.Option to sort string slice.
|
// SortStringSliceOpt is a cmp.Option to sort string slice.
|
||||||
var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string {
|
var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string {
|
||||||
out := append([]string(nil), in...)
|
out := append([]string(nil), in...)
|
||||||
|
@ -794,3 +794,39 @@ func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) {
|
|||||||
}
|
}
|
||||||
return processes, nil
|
return processes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note: Script also removes stale keys.
|
||||||
|
var listWorkersCmd = redis.NewScript(`
|
||||||
|
local res = {}
|
||||||
|
local now = tonumber(ARGV[1])
|
||||||
|
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
|
||||||
|
for _, key in ipairs(keys) do
|
||||||
|
local workers = redis.call("HVALS", key)
|
||||||
|
for _, w in ipairs(workers) do
|
||||||
|
table.insert(res, w)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
|
||||||
|
return res`)
|
||||||
|
|
||||||
|
// ListWorkers returns the list of worker stats.
|
||||||
|
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
|
||||||
|
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, time.Now().UTC().Unix()).Result()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
data, err := cast.ToStringSliceE(res)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var workers []*base.WorkerInfo
|
||||||
|
for _, s := range data {
|
||||||
|
var w base.WorkerInfo
|
||||||
|
err := json.Unmarshal([]byte(s), &w)
|
||||||
|
if err != nil {
|
||||||
|
continue // skip bad data
|
||||||
|
}
|
||||||
|
workers = append(workers, &w)
|
||||||
|
}
|
||||||
|
return workers, nil
|
||||||
|
}
|
||||||
|
@ -2122,3 +2122,68 @@ func TestListProcesses(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListWorkers(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
const (
|
||||||
|
host = "127.0.0.1"
|
||||||
|
pid = 4567
|
||||||
|
)
|
||||||
|
|
||||||
|
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"})
|
||||||
|
m2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"})
|
||||||
|
m3 := h.NewTaskMessage("reindex", map[string]interface{}{})
|
||||||
|
t1 := time.Now().Add(-time.Second)
|
||||||
|
t2 := time.Now().Add(-10 * time.Second)
|
||||||
|
t3 := time.Now().Add(-time.Minute)
|
||||||
|
|
||||||
|
type workerStats struct {
|
||||||
|
msg *base.TaskMessage
|
||||||
|
started time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
workers []*workerStats
|
||||||
|
want []*base.WorkerInfo
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
workers: []*workerStats{
|
||||||
|
{m1, t1},
|
||||||
|
{m2, t2},
|
||||||
|
{m3, t3},
|
||||||
|
},
|
||||||
|
want: []*base.WorkerInfo{
|
||||||
|
{Host: host, PID: pid, ID: m1.ID, Type: m1.Type, Queue: m1.Queue, Payload: m1.Payload, Started: t1},
|
||||||
|
{Host: host, PID: pid, ID: m2.ID, Type: m2.Type, Queue: m2.Queue, Payload: m2.Payload, Started: t2},
|
||||||
|
{Host: host, PID: pid, ID: m3.ID, Type: m3.Type, Queue: m3.Queue, Payload: m3.Payload, Started: t3},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
|
ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false)
|
||||||
|
|
||||||
|
for _, w := range tc.workers {
|
||||||
|
ps.AddWorkerStats(w.msg, w.started)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.WriteProcessState(ps, time.Minute)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("could not write process state to redis: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := r.ListWorkers()
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("(*RDB).ListWorkers() returned an error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if diff := cmp.Diff(tc.want, got, h.SortWorkerInfoOpt); diff != "" {
|
||||||
|
t.Errorf("(*RDB).ListWorkers() = %v, want = %v; (-want,+got)\n%s", got, tc.want, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user