From 2f7d7a88dd4d837f4aa3538a8ed42feeabf7a6ba Mon Sep 17 00:00:00 2001 From: pacinochen Date: Thu, 24 Feb 2022 20:13:34 +0800 Subject: [PATCH] =?UTF-8?q?chore():=20=E4=BB=8E=E5=86=85=E5=AD=98=E9=87=8C?= =?UTF-8?q?=E9=9D=A2=E8=AF=BB=E5=8F=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inspector.go | 66 +++++++++++++++------------------------------------- scheduler.go | 25 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/inspector.go b/inspector.go index 3a10431..2c8498c 100644 --- a/inspector.go +++ b/inspector.go @@ -15,14 +15,12 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" - "github.com/robfig/cron/v3" ) // Inspector is a client interface to inspect and mutate the state of // queues and tasks. type Inspector struct { - rdb *rdb.RDB - cron *cron.Cron + rdb *rdb.RDB } // New returns a new instance of Inspector. @@ -32,8 +30,7 @@ func NewInspector(r RedisConnOpt) *Inspector { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } return &Inspector{ - rdb: rdb.NewRDB(c), - cron: cron.New(cron.WithLocation(time.Local)), + rdb: rdb.NewRDB(c), } } @@ -799,55 +796,30 @@ type SchedulerEntry struct { // currently running schedulers. func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { var entries []*SchedulerEntry - fmt.Printf("start to record log file") res, err := i.rdb.ListSchedulerEntries() if err != nil { - fmt.Errorf("ListSchedulerEntries err:%s", err.Error()) return nil, err } - // 如果查询不到数据 从内存里面获取数据 - if len(res) == 0 { - fmt.Println("search entries from mem") - for _, entry := range i.cron.Entries() { - job := entry.Job.(*enqueueJob) - var opts []Option - opt := stringifyOptions(job.opts) - for _, s := range opt { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) - } + + for _, e := range res { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) } - entries = append(entries, &SchedulerEntry{ - ID: job.id.String(), - Spec: job.cronspec, - Task: job.task, - Opts: opts, - Next: entry.Next, - Prev: entry.Prev, - }) - } - fmt.Printf("entries lens is %d \n", len(res)) - } else { - for _, e := range res { - task := NewTask(e.Type, e.Payload) - var opts []Option - for _, s := range e.Opts { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) - } - } - entries = append(entries, &SchedulerEntry{ - ID: e.ID, - Spec: e.Spec, - Task: task, - Opts: opts, - Next: e.Next, - Prev: e.Prev, - }) } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) } + return entries, nil } diff --git a/scheduler.go b/scheduler.go index dcb0aa0..9f74f9d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -297,3 +297,28 @@ func (s *Scheduler) clearHistory() { } } } + +func (s *Scheduler) GetEntries() ([]*SchedulerEntry, error) { + var entries []*SchedulerEntry + + for _, entry := range s.cron.Entries() { + job := entry.Job.(*enqueueJob) + var opts []Option + opt := stringifyOptions(job.opts) + for _, s := range opt { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: job.id.String(), + Spec: job.cronspec, + Task: job.task, + Opts: opts, + Next: entry.Next, + Prev: entry.Prev, + }) + } + return entries, nil +}