mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 23:06:12 +08:00 
			
		
		
		
	Merge pull request #2 from easyops-cn/xuy/export-scheduler-funcs
chore(): export Scheduler.ID() Scheduler.Entries()
This commit is contained in:
		
							
								
								
									
										41
									
								
								scheduler.go
									
									
									
									
									
								
							
							
						
						
									
										41
									
								
								scheduler.go
									
									
									
									
									
								
							| @@ -12,10 +12,11 @@ import ( | ||||
|  | ||||
| 	"github.com/go-redis/redis/v8" | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/robfig/cron/v3" | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| 	"github.com/hibiken/asynq/internal/log" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| 	"github.com/robfig/cron/v3" | ||||
| ) | ||||
|  | ||||
| // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. | ||||
| @@ -163,6 +164,10 @@ func (j *enqueueJob) Run() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Scheduler) ID() string { | ||||
| 	return s.id | ||||
| } | ||||
|  | ||||
| // Register registers a task to be enqueued on the given schedule specified by the cronspec. | ||||
| // It returns an ID of the newly registered entry. | ||||
| func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { | ||||
| @@ -282,8 +287,32 @@ func (s *Scheduler) runHeartbeater() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // beat writes a snapshot of entries to redis. | ||||
| func (s *Scheduler) beat() { | ||||
| func (s *Scheduler) Entries() []*SchedulerEntry { | ||||
| 	var entries []*SchedulerEntry | ||||
|  | ||||
| 	for _, e := range s.entries() { | ||||
| 		task := NewTask(e.Type, e.Payload) | ||||
| 		var opts []Option | ||||
| 		for _, s := range e.Opts { | ||||
| 			if o, err := parseOption(s); err == nil { | ||||
| 				// ignore bad data | ||||
| 				opts = append(opts, o) | ||||
| 			} | ||||
| 		} | ||||
| 		entries = append(entries, &SchedulerEntry{ | ||||
| 			ID:   e.ID, | ||||
| 			Spec: e.Spec, | ||||
| 			Task: task, | ||||
| 			Opts: opts, | ||||
| 			Next: e.Next, | ||||
| 			Prev: e.Prev, | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	return entries | ||||
| } | ||||
|  | ||||
| func (s *Scheduler) entries() []*base.SchedulerEntry { | ||||
| 	var entries []*base.SchedulerEntry | ||||
| 	for _, entry := range s.cron.Entries() { | ||||
| 		job := entry.Job.(*enqueueJob) | ||||
| @@ -298,6 +327,12 @@ func (s *Scheduler) beat() { | ||||
| 		} | ||||
| 		entries = append(entries, e) | ||||
| 	} | ||||
| 	return entries | ||||
| } | ||||
|  | ||||
| // beat writes a snapshot of entries to redis. | ||||
| func (s *Scheduler) beat() { | ||||
| 	entries := s.entries() | ||||
| 	s.logger.Debugf("Writing entries %v", entries) | ||||
| 	if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil { | ||||
| 		s.logger.Warnf("Scheduler could not write heartbeat data: %v", err) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user