From e55a693fa0569b2fefd346085afa885c3dce21c3 Mon Sep 17 00:00:00 2001 From: pacinochen Date: Thu, 24 Feb 2022 14:47:04 +0800 Subject: [PATCH 1/3] =?UTF-8?q?chore():=20=E4=BB=8E=E5=86=85=E5=AD=98?= =?UTF-8?q?=E9=87=8C=E9=9D=A2=E8=AF=BB=E5=8F=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inspector.go | 65 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/inspector.go b/inspector.go index d636b34..3a10431 100644 --- a/inspector.go +++ b/inspector.go @@ -6,6 +6,7 @@ package asynq import ( "fmt" + "strconv" "strings" "time" @@ -14,12 +15,14 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/robfig/cron/v3" ) // Inspector is a client interface to inspect and mutate the state of // queues and tasks. type Inspector struct { - rdb *rdb.RDB + rdb *rdb.RDB + cron *cron.Cron } // New returns a new instance of Inspector. @@ -29,7 +32,8 @@ func NewInspector(r RedisConnOpt) *Inspector { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } return &Inspector{ - rdb: rdb.NewRDB(c), + rdb: rdb.NewRDB(c), + cron: cron.New(cron.WithLocation(time.Local)), } } @@ -795,27 +799,54 @@ type SchedulerEntry struct { // currently running schedulers. func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { var entries []*SchedulerEntry + fmt.Printf("start to record log file") res, err := i.rdb.ListSchedulerEntries() if err != nil { + fmt.Errorf("ListSchedulerEntries err:%s", err.Error()) return nil, err } - for _, e := range res { - task := NewTask(e.Type, e.Payload) - var opts []Option - for _, s := range e.Opts { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) + // 如果查询不到数据 从内存里面获取数据 + if len(res) == 0 { + fmt.Println("search entries from mem") + for _, entry := range i.cron.Entries() { + job := entry.Job.(*enqueueJob) + var opts []Option + opt := stringifyOptions(job.opts) + for _, s := range opt { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } } + entries = append(entries, &SchedulerEntry{ + ID: job.id.String(), + Spec: job.cronspec, + Task: job.task, + Opts: opts, + Next: entry.Next, + Prev: entry.Prev, + }) + } + fmt.Printf("entries lens is %d \n", len(res)) + } else { + for _, e := range res { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) } - entries = append(entries, &SchedulerEntry{ - ID: e.ID, - Spec: e.Spec, - Task: task, - Opts: opts, - Next: e.Next, - Prev: e.Prev, - }) } return entries, nil } From 2f7d7a88dd4d837f4aa3538a8ed42feeabf7a6ba Mon Sep 17 00:00:00 2001 From: pacinochen Date: Thu, 24 Feb 2022 20:13:34 +0800 Subject: [PATCH 2/3] =?UTF-8?q?chore():=20=E4=BB=8E=E5=86=85=E5=AD=98?= =?UTF-8?q?=E9=87=8C=E9=9D=A2=E8=AF=BB=E5=8F=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inspector.go | 66 +++++++++++++++------------------------------------- scheduler.go | 25 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 47 deletions(-) diff --git a/inspector.go b/inspector.go index 3a10431..2c8498c 100644 --- a/inspector.go +++ b/inspector.go @@ -15,14 +15,12 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" - "github.com/robfig/cron/v3" ) // Inspector is a client interface to inspect and mutate the state of // queues and tasks. type Inspector struct { - rdb *rdb.RDB - cron *cron.Cron + rdb *rdb.RDB } // New returns a new instance of Inspector. @@ -32,8 +30,7 @@ func NewInspector(r RedisConnOpt) *Inspector { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } return &Inspector{ - rdb: rdb.NewRDB(c), - cron: cron.New(cron.WithLocation(time.Local)), + rdb: rdb.NewRDB(c), } } @@ -799,55 +796,30 @@ type SchedulerEntry struct { // currently running schedulers. func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { var entries []*SchedulerEntry - fmt.Printf("start to record log file") res, err := i.rdb.ListSchedulerEntries() if err != nil { - fmt.Errorf("ListSchedulerEntries err:%s", err.Error()) return nil, err } - // 如果查询不到数据 从内存里面获取数据 - if len(res) == 0 { - fmt.Println("search entries from mem") - for _, entry := range i.cron.Entries() { - job := entry.Job.(*enqueueJob) - var opts []Option - opt := stringifyOptions(job.opts) - for _, s := range opt { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) - } + + for _, e := range res { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) } - entries = append(entries, &SchedulerEntry{ - ID: job.id.String(), - Spec: job.cronspec, - Task: job.task, - Opts: opts, - Next: entry.Next, - Prev: entry.Prev, - }) - } - fmt.Printf("entries lens is %d \n", len(res)) - } else { - for _, e := range res { - task := NewTask(e.Type, e.Payload) - var opts []Option - for _, s := range e.Opts { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) - } - } - entries = append(entries, &SchedulerEntry{ - ID: e.ID, - Spec: e.Spec, - Task: task, - Opts: opts, - Next: e.Next, - Prev: e.Prev, - }) } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) } + return entries, nil } diff --git a/scheduler.go b/scheduler.go index dcb0aa0..9f74f9d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -297,3 +297,28 @@ func (s *Scheduler) clearHistory() { } } } + +func (s *Scheduler) GetEntries() ([]*SchedulerEntry, error) { + var entries []*SchedulerEntry + + for _, entry := range s.cron.Entries() { + job := entry.Job.(*enqueueJob) + var opts []Option + opt := stringifyOptions(job.opts) + for _, s := range opt { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: job.id.String(), + Spec: job.cronspec, + Task: job.task, + Opts: opts, + Next: entry.Next, + Prev: entry.Prev, + }) + } + return entries, nil +} From 311477639e678b30f639a21301001684a599526b Mon Sep 17 00:00:00 2001 From: pacinochen Date: Sun, 27 Feb 2022 21:21:54 +0800 Subject: [PATCH 3/3] =?UTF-8?q?chore():=20=E8=B0=83=E5=BA=A6=E8=80=85?= =?UTF-8?q?=E9=98=9F=E5=88=97=E4=BB=BB=E5=8A=A1=E8=BF=87=E6=9C=9F=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E8=AE=BE=E7=BD=AE=E4=B8=BA60s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scheduler.go | 27 +-------------------------- 1 file changed, 1 insertion(+), 26 deletions(-) diff --git a/scheduler.go b/scheduler.go index 9f74f9d..6a87e66 100644 --- a/scheduler.go +++ b/scheduler.go @@ -276,7 +276,7 @@ func (s *Scheduler) beat() { entries = append(entries, e) } s.logger.Debugf("Writing entries %v", entries) - if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil { + if err := s.rdb.WriteSchedulerEntries(s.id, entries, 60*time.Second); err != nil { s.logger.Warnf("Scheduler could not write heartbeat data: %v", err) } } @@ -297,28 +297,3 @@ func (s *Scheduler) clearHistory() { } } } - -func (s *Scheduler) GetEntries() ([]*SchedulerEntry, error) { - var entries []*SchedulerEntry - - for _, entry := range s.cron.Entries() { - job := entry.Job.(*enqueueJob) - var opts []Option - opt := stringifyOptions(job.opts) - for _, s := range opt { - if o, err := parseOption(s); err == nil { - // ignore bad data - opts = append(opts, o) - } - } - entries = append(entries, &SchedulerEntry{ - ID: job.id.String(), - Spec: job.cronspec, - Task: job.task, - Opts: opts, - Next: entry.Next, - Prev: entry.Prev, - }) - } - return entries, nil -}