mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-25 23:06:12 +08:00
chore(): export Scheduler.ID() Scheduler.Entries()
This commit is contained in:
41
scheduler.go
41
scheduler.go
@@ -12,10 +12,11 @@ import (
|
|||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"github.com/robfig/cron/v3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
|
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
|
||||||
@@ -163,6 +164,10 @@ func (j *enqueueJob) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) ID() string {
|
||||||
|
return s.id
|
||||||
|
}
|
||||||
|
|
||||||
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
|
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
|
||||||
// It returns an ID of the newly registered entry.
|
// It returns an ID of the newly registered entry.
|
||||||
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
|
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
|
||||||
@@ -282,8 +287,32 @@ func (s *Scheduler) runHeartbeater() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// beat writes a snapshot of entries to redis.
|
func (s *Scheduler) Entries() []*SchedulerEntry {
|
||||||
func (s *Scheduler) beat() {
|
var entries []*SchedulerEntry
|
||||||
|
|
||||||
|
for _, e := range s.entries() {
|
||||||
|
task := NewTask(e.Type, e.Payload)
|
||||||
|
var opts []Option
|
||||||
|
for _, s := range e.Opts {
|
||||||
|
if o, err := parseOption(s); err == nil {
|
||||||
|
// ignore bad data
|
||||||
|
opts = append(opts, o)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
entries = append(entries, &SchedulerEntry{
|
||||||
|
ID: e.ID,
|
||||||
|
Spec: e.Spec,
|
||||||
|
Task: task,
|
||||||
|
Opts: opts,
|
||||||
|
Next: e.Next,
|
||||||
|
Prev: e.Prev,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return entries
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) entries() []*base.SchedulerEntry {
|
||||||
var entries []*base.SchedulerEntry
|
var entries []*base.SchedulerEntry
|
||||||
for _, entry := range s.cron.Entries() {
|
for _, entry := range s.cron.Entries() {
|
||||||
job := entry.Job.(*enqueueJob)
|
job := entry.Job.(*enqueueJob)
|
||||||
@@ -298,6 +327,12 @@ func (s *Scheduler) beat() {
|
|||||||
}
|
}
|
||||||
entries = append(entries, e)
|
entries = append(entries, e)
|
||||||
}
|
}
|
||||||
|
return entries
|
||||||
|
}
|
||||||
|
|
||||||
|
// beat writes a snapshot of entries to redis.
|
||||||
|
func (s *Scheduler) beat() {
|
||||||
|
entries := s.entries()
|
||||||
s.logger.Debugf("Writing entries %v", entries)
|
s.logger.Debugf("Writing entries %v", entries)
|
||||||
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
|
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
|
||||||
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
|
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user