diff --git a/client.go b/client.go index 4a0ba39..8dad9c7 100644 --- a/client.go +++ b/client.go @@ -60,6 +60,7 @@ const ( TaskIDOpt RetentionOpt GroupOpt + SchedulerEntryIDOpt ) // Option specifies the task processing behavior. @@ -76,16 +77,17 @@ type Option interface { // Internal option representations. type ( - retryOption int - queueOption string - taskIDOption string - timeoutOption time.Duration - deadlineOption time.Time - uniqueOption time.Duration - processAtOption time.Time - processInOption time.Duration - retentionOption time.Duration - groupOption string + retryOption int + queueOption string + taskIDOption string + timeoutOption time.Duration + deadlineOption time.Time + uniqueOption time.Duration + processAtOption time.Time + processInOption time.Duration + retentionOption time.Duration + groupOption string + schedulerEntryIDOption string ) // MaxRetry returns an option to specify the max number of times @@ -217,6 +219,18 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st func (name groupOption) Type() OptionType { return GroupOpt } func (name groupOption) Value() interface{} { return string(name) } +// SchedulerEntryIDOpt returns an option to specify the scheduler entry ID. +// This option is only applicable to registered scheduled jobs. +func SchedulerEntryID(id string) Option { + return schedulerEntryIDOption(id) +} + +func (id schedulerEntryIDOption) String() string { + return fmt.Sprintf("SchedulerEntryID(%q)", string(id)) +} +func (id schedulerEntryIDOption) Type() OptionType { return SchedulerEntryIDOpt } +func (id schedulerEntryIDOption) Value() interface{} { return string(id) } + // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. diff --git a/scheduler.go b/scheduler.go index 9dbfa1b..f914ab8 100644 --- a/scheduler.go +++ b/scheduler.go @@ -165,7 +165,7 @@ type SchedulerOpts struct { // enqueueJob encapsulates the job of enqueuing a task and recording the event. type enqueueJob struct { - id uuid.UUID + id string cronspec string task *Task opts []Option @@ -197,7 +197,7 @@ func (j *enqueueJob) Run() { TaskID: info.ID, EnqueuedAt: time.Now().In(j.location), } - err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) + err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event) if err != nil { j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err) } @@ -206,8 +206,9 @@ func (j *enqueueJob) Run() { // Register registers a task to be enqueued on the given schedule specified by the cronspec. // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { + job := &enqueueJob{ - id: uuid.New(), + id: generateEntryID(opts...), cronspec: cronspec, task: task, opts: opts, @@ -224,9 +225,27 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry return "", err } s.mu.Lock() - s.idmap[job.id.String()] = cronID + s.idmap[job.id] = cronID s.mu.Unlock() - return job.id.String(), nil + return job.id, nil +} + +func generateEntryID(opts ...Option) string { + var id string + + if opts != nil { + for _, v := range opts { + if v.Type() == SchedulerEntryIDOpt { + id = v.Value().(string) + } + } + } + + if id == "" { + id = uuid.New().String() + } + + return id } // Unregister removes a registered entry by entry ID. @@ -331,7 +350,7 @@ func (s *Scheduler) beat() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) e := &base.SchedulerEntry{ - ID: job.id.String(), + ID: job.id, Spec: job.cronspec, Type: job.task.Type(), Payload: job.task.Payload(), @@ -357,8 +376,8 @@ func stringifyOptions(opts []Option) []string { func (s *Scheduler) clearHistory() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) - if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { - s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + if err := s.rdb.ClearSchedulerHistory(job.id); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err) } } } diff --git a/scheduler_test.go b/scheduler_test.go index 619a1b7..4e393c5 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -232,3 +232,50 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) { } postMu.Unlock() } + +func TestSchedulerWithCustomEntryIDOpt(t *testing.T) { + tests := []struct { + cronspec string + task *Task + opts []Option + wait time.Duration + queue string + want []*base.TaskMessage + }{ + { + cronspec: "@every 3s", + task: NewTask("task1", nil), + opts: []Option{ + MaxRetry(10), + SchedulerEntryID("entry1"), + }, + wait: 10 * time.Second, + queue: "default", + want: []*base.TaskMessage{ + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + ID: "entry1", + }, + }, + }, + } + + for _, tc := range tests { + scheduler := NewScheduler(getRedisConnOpt(t), nil) + entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...) + if err != nil { + t.Fatal(err) + } + + time.Sleep(tc.wait) + scheduler.Shutdown() + + if entryID != "entry1" { + t.Errorf("entryID = %q, want %q", entryID, "entry1") + } + } +}