From e21fe3bd7975b7fa0f80dd37d010b2fe36e30436 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 22 Feb 2020 20:42:53 -0800 Subject: [PATCH] Add ListWorkers to RDB --- client_test.go | 2 +- internal/asynqtest/asynqtest.go | 9 +++++ internal/rdb/inspect.go | 36 ++++++++++++++++++ internal/rdb/inspect_test.go | 65 +++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) diff --git a/client_test.go b/client_test.go index 1bd19ef..f0b8710 100644 --- a/client_test.go +++ b/client_test.go @@ -15,7 +15,7 @@ import ( func TestClient(t *testing.T) { r := setup(t) - client := NewClient(&RedisClientOpt{ + client := NewClient(RedisClientOpt{ Addr: "localhost:6379", DB: 14, }) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 7b8a0ae..07de078 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -53,6 +53,15 @@ var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.Proc return out }) +// SortWorkerInfoOpt is a cmp.Option to sort base.WorkerInfo for comparing slice of worker info. +var SortWorkerInfoOpt = cmp.Transformer("SortWorkerInfo", func(in []*base.WorkerInfo) []*base.WorkerInfo { + out := append([]*base.WorkerInfo(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out +}) + // SortStringSliceOpt is a cmp.Option to sort string slice. var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string { out := append([]string(nil), in...) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index d0f4be7..1678136 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -794,3 +794,39 @@ func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { } return processes, nil } + +// Note: Script also removes stale keys. +var listWorkersCmd = redis.NewScript(` +local res = {} +local now = tonumber(ARGV[1]) +local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") +for _, key in ipairs(keys) do + local workers = redis.call("HVALS", key) + for _, w in ipairs(workers) do + table.insert(res, w) + end +end +redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) +return res`) + +// ListWorkers returns the list of worker stats. +func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { + res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, time.Now().UTC().Unix()).Result() + if err != nil { + return nil, err + } + data, err := cast.ToStringSliceE(res) + if err != nil { + return nil, err + } + var workers []*base.WorkerInfo + for _, s := range data { + var w base.WorkerInfo + err := json.Unmarshal([]byte(s), &w) + if err != nil { + continue // skip bad data + } + workers = append(workers, &w) + } + return workers, nil +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 48bcb56..c3b774e 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2122,3 +2122,68 @@ func TestListProcesses(t *testing.T) { } } } + +func TestListWorkers(t *testing.T) { + r := setup(t) + + const ( + host = "127.0.0.1" + pid = 4567 + ) + + m1 := h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"}) + m2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"}) + m3 := h.NewTaskMessage("reindex", map[string]interface{}{}) + t1 := time.Now().Add(-time.Second) + t2 := time.Now().Add(-10 * time.Second) + t3 := time.Now().Add(-time.Minute) + + type workerStats struct { + msg *base.TaskMessage + started time.Time + } + + tests := []struct { + workers []*workerStats + want []*base.WorkerInfo + }{ + { + workers: []*workerStats{ + {m1, t1}, + {m2, t2}, + {m3, t3}, + }, + want: []*base.WorkerInfo{ + {Host: host, PID: pid, ID: m1.ID, Type: m1.Type, Queue: m1.Queue, Payload: m1.Payload, Started: t1}, + {Host: host, PID: pid, ID: m2.ID, Type: m2.Type, Queue: m2.Queue, Payload: m2.Payload, Started: t2}, + {Host: host, PID: pid, ID: m3.ID, Type: m3.Type, Queue: m3.Queue, Payload: m3.Payload, Started: t3}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + + ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false) + + for _, w := range tc.workers { + ps.AddWorkerStats(w.msg, w.started) + } + + err := r.WriteProcessState(ps, time.Minute) + if err != nil { + t.Errorf("could not write process state to redis: %v", err) + continue + } + + got, err := r.ListWorkers() + if err != nil { + t.Errorf("(*RDB).ListWorkers() returned an error: %v", err) + continue + } + + if diff := cmp.Diff(tc.want, got, h.SortWorkerInfoOpt); diff != "" { + t.Errorf("(*RDB).ListWorkers() = %v, want = %v; (-want,+got)\n%s", got, tc.want, diff) + } + } +}