From f9c06731169b741ee6837c9f2feea6350651206b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 1 Dec 2020 07:37:09 -0800 Subject: [PATCH] Add SchedulerEntries method to Inspector --- inspector.go | 51 ++++++++++++++++++++++++++++ inspector_test.go | 79 +++++++++++++++++++++++++++++++++++++++++++ internal/base/base.go | 2 +- 3 files changed, 131 insertions(+), 1 deletion(-) diff --git a/inspector.go b/inspector.go index 94a26b1..2e92961 100644 --- a/inspector.go +++ b/inspector.go @@ -642,3 +642,54 @@ func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) { } return res, nil } + +// SchedulerEntry holds information about a periodic task registered with a scheduler. +type SchedulerEntry struct { + // Identifier of this entry. + ID string + + // Spec describes the schedule of this entry. + Spec string + + // Periodic Task registered for this entry. + Task *Task + + // Opts is the options for the periodic task. + Opts []Option + + // Next shows the next time the task will be enqueued. + Next time.Time + + // Prev shows the last time the task was enqueued. + // Zero time if task was never enqueued. + Prev time.Time +} + +// SchedulerEntries returns a list of all entries registered with +// currently running schedulers. +func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { + var entries []*SchedulerEntry + res, err := i.rdb.ListSchedulerEntries() + if err != nil { + return nil, err + } + for _, e := range res { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) + } + return entries, nil +} diff --git a/inspector_test.go b/inspector_test.go index ff91d5f..830c79b 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -7,6 +7,7 @@ package asynq import ( "fmt" "math" + "sort" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" ) func TestInspectorQueues(t *testing.T) { @@ -2141,3 +2143,80 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { } } } + +var sortSchedulerEntry = cmp.Transformer("SortSchedulerEntry", func(in []*SchedulerEntry) []*SchedulerEntry { + out := append([]*SchedulerEntry(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].Spec < out[j].Spec + }) + return out +}) + +func TestInspectorSchedulerEntries(t *testing.T) { + r := setup(t) + rdbClient := rdb.NewRDB(r) + inspector := NewInspector(getRedisConnOpt(t)) + + now := time.Now().UTC() + schedulerID := "127.0.0.1:9876:abc123" + + tests := []struct { + data []*base.SchedulerEntry // data to seed redis + want []*SchedulerEntry + }{ + { + data: []*base.SchedulerEntry{ + &base.SchedulerEntry{ + Spec: "* * * * *", + Type: "foo", + Payload: nil, + Opts: nil, + Next: now.Add(5 * time.Hour), + Prev: now.Add(-2 * time.Hour), + }, + &base.SchedulerEntry{ + Spec: "@every 20m", + Type: "bar", + Payload: map[string]interface{}{"fiz": "baz"}, + Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, + Next: now.Add(1 * time.Minute), + Prev: now.Add(-19 * time.Minute), + }, + }, + want: []*SchedulerEntry{ + &SchedulerEntry{ + Spec: "* * * * *", + Task: NewTask("foo", nil), + Opts: nil, + Next: now.Add(5 * time.Hour), + Prev: now.Add(-2 * time.Hour), + }, + &SchedulerEntry{ + Spec: "@every 20m", + Task: NewTask("bar", map[string]interface{}{"fiz": "baz"}), + Opts: []Option{Queue("bar"), MaxRetry(20)}, + Next: now.Add(1 * time.Minute), + Prev: now.Add(-19 * time.Minute), + }, + }, + }, + } + + for _, tc := range tests { + asynqtest.FlushDB(t, r) + err := rdbClient.WriteSchedulerEntries(schedulerID, tc.data, time.Minute) + if err != nil { + t.Fatalf("could not write data: %v", err) + } + got, err := inspector.SchedulerEntries() + if err != nil { + t.Errorf("SchedulerEntries() returned error: %v", err) + continue + } + ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) + if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { + t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", + got, tc.want, diff) + } + } +} diff --git a/internal/base/base.go b/internal/base/base.go index 1cad7de..67ce26e 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -304,7 +304,7 @@ type SchedulerEntry struct { // Next shows the next time the task will be enqueued. Next time.Time - // Prev shows the last time the task was enqueued. + // Prev shows the last time the task was enqueued. // Zero time if task was never enqueued. Prev time.Time }