From e55a693fa0569b2fefd346085afa885c3dce21c3 Mon Sep 17 00:00:00 2001 From: pacinochen Date: Thu, 24 Feb 2022 14:47:04 +0800 Subject: [PATCH] =?UTF-8?q?chore():=20=E4=BB=8E=E5=86=85=E5=AD=98=E9=87=8C?= =?UTF-8?q?=E9=9D=A2=E8=AF=BB=E5=8F=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inspector.go | 65 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/inspector.go b/inspector.go index d636b34..3a10431 100644 --- a/inspector.go +++ b/inspector.go @@ -6,6 +6,7 @@ package asynq import ( "fmt" + "strconv" "strings" "time" @@ -14,12 +15,14 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/robfig/cron/v3" ) // Inspector is a client interface to inspect and mutate the state of // queues and tasks. type Inspector struct { - rdb *rdb.RDB + rdb *rdb.RDB + cron *cron.Cron } // New returns a new instance of Inspector. @@ -29,7 +32,8 @@ func NewInspector(r RedisConnOpt) *Inspector { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } return &Inspector{ - rdb: rdb.NewRDB(c), + rdb: rdb.NewRDB(c), + cron: cron.New(cron.WithLocation(time.Local)), } } @@ -795,27 +799,54 @@ type SchedulerEntry struct { // currently running schedulers. func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { var entries []*SchedulerEntry + fmt.Printf("start to record log file") res, err := i.rdb.ListSchedulerEntries() if err != nil { + fmt.Errorf("ListSchedulerEntries err:%s", err.Error()) return nil, err } - for _, e := range res { - task := NewTask(e.Type, e.Payload) - var opts []Option - for _, s := range e.Opts { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) + // 如果查询不到数据 从内存里面获取数据 + if len(res) == 0 { + fmt.Println("search entries from mem") + for _, entry := range i.cron.Entries() { + job := entry.Job.(*enqueueJob) + var opts []Option + opt := stringifyOptions(job.opts) + for _, s := range opt { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } } + entries = append(entries, &SchedulerEntry{ + ID: job.id.String(), + Spec: job.cronspec, + Task: job.task, + Opts: opts, + Next: entry.Next, + Prev: entry.Prev, + }) + } + fmt.Printf("entries lens is %d \n", len(res)) + } else { + for _, e := range res { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) } - entries = append(entries, &SchedulerEntry{ - ID: e.ID, - Spec: e.Spec, - Task: task, - Opts: opts, - Next: e.Next, - Prev: e.Prev, - }) } return entries, nil }