2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add SchedulerEntries method to Inspector

This commit is contained in:
Ken Hibino 2020-12-01 07:37:09 -08:00
parent b604d25937
commit f9c0673116
3 changed files with 131 additions and 1 deletions

View File

@ -642,3 +642,54 @@ func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) {
}
return res, nil
}
// SchedulerEntry holds information about a periodic task registered with a scheduler.
type SchedulerEntry struct {
// Identifier of this entry.
ID string
// Spec describes the schedule of this entry.
Spec string
// Periodic Task registered for this entry.
Task *Task
// Opts is the options for the periodic task.
Opts []Option
// Next shows the next time the task will be enqueued.
Next time.Time
// Prev shows the last time the task was enqueued.
// Zero time if task was never enqueued.
Prev time.Time
}
// SchedulerEntries returns a list of all entries registered with
// currently running schedulers.
func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
var entries []*SchedulerEntry
res, err := i.rdb.ListSchedulerEntries()
if err != nil {
return nil, err
}
for _, e := range res {
task := NewTask(e.Type, e.Payload)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
opts = append(opts, o)
}
}
entries = append(entries, &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
Task: task,
Opts: opts,
Next: e.Next,
Prev: e.Prev,
})
}
return entries, nil
}

View File

@ -7,6 +7,7 @@ package asynq
import (
"fmt"
"math"
"sort"
"testing"
"time"
@ -15,6 +16,7 @@ import (
"github.com/hibiken/asynq/internal/asynqtest"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestInspectorQueues(t *testing.T) {
@ -2141,3 +2143,80 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
}
}
}
var sortSchedulerEntry = cmp.Transformer("SortSchedulerEntry", func(in []*SchedulerEntry) []*SchedulerEntry {
out := append([]*SchedulerEntry(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].Spec < out[j].Spec
})
return out
})
func TestInspectorSchedulerEntries(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
inspector := NewInspector(getRedisConnOpt(t))
now := time.Now().UTC()
schedulerID := "127.0.0.1:9876:abc123"
tests := []struct {
data []*base.SchedulerEntry // data to seed redis
want []*SchedulerEntry
}{
{
data: []*base.SchedulerEntry{
&base.SchedulerEntry{
Spec: "* * * * *",
Type: "foo",
Payload: nil,
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
&base.SchedulerEntry{
Spec: "@every 20m",
Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"},
Opts: []string{`Queue("bar")`, `MaxRetry(20)`},
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},
},
want: []*SchedulerEntry{
&SchedulerEntry{
Spec: "* * * * *",
Task: NewTask("foo", nil),
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
&SchedulerEntry{
Spec: "@every 20m",
Task: NewTask("bar", map[string]interface{}{"fiz": "baz"}),
Opts: []Option{Queue("bar"), MaxRetry(20)},
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
err := rdbClient.WriteSchedulerEntries(schedulerID, tc.data, time.Minute)
if err != nil {
t.Fatalf("could not write data: %v", err)
}
got, err := inspector.SchedulerEntries()
if err != nil {
t.Errorf("SchedulerEntries() returned error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" {
t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff)
}
}
}

View File

@ -304,7 +304,7 @@ type SchedulerEntry struct {
// Next shows the next time the task will be enqueued.
Next time.Time
// Prev shows the last time the task was enqueued.
// Prev shows the last time the task was enqueued.
// Zero time if task was never enqueued.
Prev time.Time
}