diff --git a/example_test.go b/example_test.go index 333236d..8e6664a 100644 --- a/example_test.go +++ b/example_test.go @@ -86,10 +86,10 @@ func ExampleScheduler() { &asynq.SchedulerOpts{Location: time.Local}, ) - if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { + if _, err := scheduler.Register("1", "* * * * *", asynq.NewTask("task1", nil)); err != nil { log.Fatal(err) } - if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { + if _, err := scheduler.Register("2", "@every 30s", asynq.NewTask("task2", nil)); err != nil { log.Fatal(err) } diff --git a/periodic_task_manager.go b/periodic_task_manager.go index bc60dba..28f9f4e 100644 --- a/periodic_task_manager.go +++ b/periodic_task_manager.go @@ -22,7 +22,7 @@ type PeriodicTaskManager struct { syncInterval time.Duration done chan (struct{}) wg sync.WaitGroup - m map[string]string // map[hash]entryID + m map[string]struct{} // map[hash]entryID } type PeriodicTaskManagerOpts struct { @@ -69,7 +69,7 @@ func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, p: opts.PeriodicTaskConfigProvider, syncInterval: syncInterval, done: make(chan struct{}), - m: make(map[string]string), + m: make(map[string]struct{}), }, nil } @@ -82,6 +82,7 @@ type PeriodicTaskConfigProvider interface { // PeriodicTaskConfig specifies the details of a periodic task. type PeriodicTaskConfig struct { + ID string Cronspec string // required: must be non empty string Task *Task // required: must be non nil Opts []Option // optional: can be nil @@ -181,25 +182,25 @@ func (mgr *PeriodicTaskManager) initialSync() error { func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) { for _, c := range configs { - entryID, err := mgr.s.Register(c.Cronspec, c.Task, c.Opts...) + entryID, err := mgr.s.Register(c.ID, c.Cronspec, c.Task, c.Opts...) if err != nil { mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q err=%v", c.Cronspec, c.Task.Type(), err) continue } - mgr.m[c.hash()] = entryID + mgr.m[entryID] = struct{}{} // ? m[string]struct{} mgr.s.logger.Infof("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s", c.Cronspec, c.Task.Type(), entryID) } } -func (mgr *PeriodicTaskManager) remove(removed map[string]string) { - for hash, entryID := range removed { +func (mgr *PeriodicTaskManager) remove(removed map[string]struct{}) { + for entryID, _ := range removed { if err := mgr.s.Unregister(entryID); err != nil { mgr.s.logger.Errorf("Failed to unregister periodic task: %v", err) continue } - delete(mgr.m, hash) + delete(mgr.m, entryID) mgr.s.logger.Infof("Successfully unregistered periodic task: entryID=%s", entryID) } } @@ -225,16 +226,16 @@ func (mgr *PeriodicTaskManager) sync() { // diffRemoved diffs the incoming configs with the registered config and returns // a map containing hash and entryID of each config that was removed. -func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[string]string { - newConfigs := make(map[string]string) +func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[string]struct{} { + newConfigs := make(map[string]struct{}) for _, c := range configs { - newConfigs[c.hash()] = "" // empty value since we don't have entryID yet + newConfigs[c.ID] = struct{}{} // empty value since we don't have entryID yet } - removed := make(map[string]string) - for k, v := range mgr.m { + removed := make(map[string]struct{}) + for k, _ := range mgr.m { // test whether existing config is present in the incoming configs if _, found := newConfigs[k]; !found { - removed[k] = v + removed[k] = struct{}{} } } return removed @@ -245,7 +246,7 @@ func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[s func (mgr *PeriodicTaskManager) diffAdded(configs []*PeriodicTaskConfig) []*PeriodicTaskConfig { var added []*PeriodicTaskConfig for _, c := range configs { - if _, found := mgr.m[c.hash()]; !found { + if _, found := mgr.m[c.ID]; !found { added = append(added, c) } } diff --git a/scheduler.go b/scheduler.go index 9dbfa1b..ab24c91 100644 --- a/scheduler.go +++ b/scheduler.go @@ -165,7 +165,7 @@ type SchedulerOpts struct { // enqueueJob encapsulates the job of enqueuing a task and recording the event. type enqueueJob struct { - id uuid.UUID + id string cronspec string task *Task opts []Option @@ -197,7 +197,7 @@ func (j *enqueueJob) Run() { TaskID: info.ID, EnqueuedAt: time.Now().In(j.location), } - err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) + err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event) if err != nil { j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err) } @@ -205,9 +205,9 @@ func (j *enqueueJob) Run() { // Register registers a task to be enqueued on the given schedule specified by the cronspec. // It returns an ID of the newly registered entry. -func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { +func (s *Scheduler) Register(ID, cronspec string, task *Task, opts ...Option) (entryID string, err error) { job := &enqueueJob{ - id: uuid.New(), + id: ID, cronspec: cronspec, task: task, opts: opts, @@ -224,9 +224,9 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry return "", err } s.mu.Lock() - s.idmap[job.id.String()] = cronID + s.idmap[job.id] = cronID s.mu.Unlock() - return job.id.String(), nil + return job.id, nil } // Unregister removes a registered entry by entry ID. @@ -331,7 +331,7 @@ func (s *Scheduler) beat() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) e := &base.SchedulerEntry{ - ID: job.id.String(), + ID: job.id, Spec: job.cronspec, Type: job.task.Type(), Payload: job.task.Payload(), @@ -357,8 +357,8 @@ func stringifyOptions(opts []Option) []string { func (s *Scheduler) clearHistory() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) - if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { - s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + if err := s.rdb.ClearSchedulerHistory(job.id); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err) } } } diff --git a/scheduler_test.go b/scheduler_test.go index 619a1b7..0f6e1d4 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -62,7 +62,7 @@ func TestSchedulerRegister(t *testing.T) { // Tests for new redis connection. for _, tc := range tests { scheduler := NewScheduler(getRedisConnOpt(t), nil) - if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { + if _, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...); err != nil { t.Fatal(err) } @@ -84,7 +84,7 @@ func TestSchedulerRegister(t *testing.T) { for _, tc := range tests { redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient) scheduler := NewSchedulerFromRedisClient(redisClient, nil) - if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { + if _, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...); err != nil { t.Fatal(err) } @@ -120,7 +120,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) { task := NewTask("test", nil) - if _, err := scheduler.Register("@every 3s", task); err != nil { + if _, err := scheduler.Register("1", "@every 3s", task); err != nil { t.Fatal(err) } @@ -159,7 +159,7 @@ func TestSchedulerUnregister(t *testing.T) { for _, tc := range tests { scheduler := NewScheduler(getRedisConnOpt(t), nil) - entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...) + entryID, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...) if err != nil { t.Fatal(err) } @@ -209,7 +209,7 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) { task := NewTask("test", nil) - if _, err := scheduler.Register("@every 3s", task); err != nil { + if _, err := scheduler.Register("1", "@every 3s", task); err != nil { t.Fatal(err) }