From 94a4b7d126234aadf028c3579666cf668a330695 Mon Sep 17 00:00:00 2001 From: youngxu Date: Fri, 8 Jul 2022 11:49:44 +0800 Subject: [PATCH] chore(): export Scheduler.ID() Scheduler.Entries() --- scheduler.go | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/scheduler.go b/scheduler.go index 12c702c..98153fd 100644 --- a/scheduler.go +++ b/scheduler.go @@ -12,10 +12,11 @@ import ( "github.com/go-redis/redis/v8" "github.com/google/uuid" + "github.com/robfig/cron/v3" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" - "github.com/robfig/cron/v3" ) // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. @@ -163,6 +164,10 @@ func (j *enqueueJob) Run() { } } +func (s *Scheduler) ID() string { + return s.id +} + // Register registers a task to be enqueued on the given schedule specified by the cronspec. // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { @@ -282,8 +287,32 @@ func (s *Scheduler) runHeartbeater() { } } -// beat writes a snapshot of entries to redis. -func (s *Scheduler) beat() { +func (s *Scheduler) Entries() []*SchedulerEntry { + var entries []*SchedulerEntry + + for _, e := range s.entries() { + task := NewTask(e.Type, e.Payload) + var opts []Option + for _, s := range e.Opts { + if o, err := parseOption(s); err == nil { + // ignore bad data + opts = append(opts, o) + } + } + entries = append(entries, &SchedulerEntry{ + ID: e.ID, + Spec: e.Spec, + Task: task, + Opts: opts, + Next: e.Next, + Prev: e.Prev, + }) + } + + return entries +} + +func (s *Scheduler) entries() []*base.SchedulerEntry { var entries []*base.SchedulerEntry for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) @@ -298,6 +327,12 @@ func (s *Scheduler) beat() { } entries = append(entries, e) } + return entries +} + +// beat writes a snapshot of entries to redis. +func (s *Scheduler) beat() { + entries := s.entries() s.logger.Debugf("Writing entries %v", entries) if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil { s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)