diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cec214..b45e93a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `Servers` method is added to `Inspector` - `CancelActiveTask` method is added to `Inspector`. - `ListSchedulerEnqueueEvents` method is added to `Inspector`. - `SchedulerEntries` method is added to `Inspector`. diff --git a/heartbeat.go b/heartbeat.go index d504541..eeb08b5 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -147,13 +147,14 @@ func (h *heartbeater) beat() { var ws []*base.WorkerInfo for id, stat := range h.workers { ws = append(ws, &base.WorkerInfo{ - Host: h.host, - PID: h.pid, - ID: id, - Type: stat.msg.Type, - Queue: stat.msg.Queue, - Payload: stat.msg.Payload, - Started: stat.started, + Host: h.host, + PID: h.pid, + ServerID: h.serverID, + ID: id, + Type: stat.msg.Type, + Queue: stat.msg.Queue, + Payload: stat.msg.Payload, + Started: stat.started, }) } diff --git a/inspector.go b/inspector.go index 5f9b8e4..eacc99d 100644 --- a/inspector.go +++ b/inspector.go @@ -624,6 +624,84 @@ func (i *Inspector) UnpauseQueue(qname string) error { return i.rdb.Unpause(qname) } +// Servers return a list of running servers' information. +func (i *Inspector) Servers() ([]*ServerInfo, error) { + servers, err := i.rdb.ListServers() + if err != nil { + return nil, err + } + workers, err := i.rdb.ListWorkers() + if err != nil { + return nil, err + } + m := make(map[string]*ServerInfo) // ServerInfo keyed by serverID + for _, s := range servers { + m[s.ServerID] = &ServerInfo{ + ID: s.ServerID, + Host: s.Host, + PID: s.PID, + Concurrency: s.Concurrency, + Queues: s.Queues, + StrictPriority: s.StrictPriority, + Started: s.Started, + Status: s.Status, + ActiveWorkers: make([]*WorkerInfo, 0), + } + } + for _, w := range workers { + srvInfo, ok := m[w.ServerID] + if !ok { + continue + } + wrkInfo := &WorkerInfo{ + Started: w.Started, + Task: &ActiveTask{ + Task: NewTask(w.Type, w.Payload), + ID: w.ID, + Queue: w.Queue, + }, + } + srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo) + } + var out []*ServerInfo + for _, srvInfo := range m { + out = append(out, srvInfo) + } + return out, nil +} + +// ServerInfo describes a running Server instance. +type ServerInfo struct { + // Unique Identifier for the server. + ID string + // Host machine on which the server is running. + Host string + // PID of the process in which the server is running. + PID int + + // Server configuration details. + // See Config doc for field descriptions. + Concurrency int + Queues map[string]int + StrictPriority bool + + // Time the server started. + Started time.Time + // Status indicates the status of the server. + // TODO: Update comment with more details. + Status string + // A List of active workers currently processing tasks. + ActiveWorkers []*WorkerInfo +} + +// WorkerInfo describes a running worker processing a task. +type WorkerInfo struct { + // The task the worker is processing. + Task *ActiveTask + // Time the worker started processing the task. + Started time.Time +} + // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. func (i *Inspector) ClusterKeySlot(qname string) (int64, error) { return i.rdb.ClusterKeySlot(qname) diff --git a/internal/base/base.go b/internal/base/base.go index 67ce26e..44559cb 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -275,13 +275,14 @@ type ServerInfo struct { // WorkerInfo holds information about a running worker. type WorkerInfo struct { - Host string - PID int - ID string - Type string - Queue string - Payload map[string]interface{} - Started time.Time + Host string + PID int + ServerID string + ID string + Type string + Queue string + Payload map[string]interface{} + Started time.Time } // SchedulerEntry holds information about a periodic task registered with a scheduler. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index c274c8d..552b88c 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2942,8 +2942,9 @@ func TestListWorkers(t *testing.T) { defer r.Close() var ( - host = "127.0.0.1" - pid = 4567 + host = "127.0.0.1" + pid = 4567 + serverID = "server123" m1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"}) m2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"}) @@ -2955,9 +2956,36 @@ func TestListWorkers(t *testing.T) { }{ { data: []*base.WorkerInfo{ - {Host: host, PID: pid, ID: m1.ID.String(), Type: m1.Type, Queue: m1.Queue, Payload: m1.Payload, Started: time.Now().Add(-1 * time.Second)}, - {Host: host, PID: pid, ID: m2.ID.String(), Type: m2.Type, Queue: m2.Queue, Payload: m2.Payload, Started: time.Now().Add(-5 * time.Second)}, - {Host: host, PID: pid, ID: m3.ID.String(), Type: m3.Type, Queue: m3.Queue, Payload: m3.Payload, Started: time.Now().Add(-30 * time.Second)}, + { + Host: host, + PID: pid, + ServerID: serverID, + ID: m1.ID.String(), + Type: m1.Type, + Queue: m1.Queue, + Payload: m1.Payload, + Started: time.Now().Add(-1 * time.Second), + }, + { + Host: host, + PID: pid, + ServerID: serverID, + ID: m2.ID.String(), + Type: m2.Type, + Queue: m2.Queue, + Payload: m2.Payload, + Started: time.Now().Add(-5 * time.Second), + }, + { + Host: host, + PID: pid, + ServerID: serverID, + ID: m3.ID.String(), + Type: m3.Type, + Queue: m3.Queue, + Payload: m3.Payload, + Started: time.Now().Add(-30 * time.Second), + }, }, }, }