From 7c65581fd7ac89ecb6ab5c4a59fccd205e61fe6d Mon Sep 17 00:00:00 2001 From: ilkerkorkut Date: Wed, 1 Mar 2023 01:35:10 +0300 Subject: [PATCH] SchedulerEntryID option for registered scheduled job --- client.go | 40 +++++++++++++++++++--------- scheduler.go | 35 +++++++++++++++++++------ scheduler_test.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 21 deletions(-) diff --git a/client.go b/client.go index 7948036..00a8273 100644 --- a/client.go +++ b/client.go @@ -49,6 +49,7 @@ const ( TaskIDOpt RetentionOpt GroupOpt + SchedulerEntryIDOpt ) // Option specifies the task processing behavior. @@ -65,16 +66,17 @@ type Option interface { // Internal option representations. type ( - retryOption int - queueOption string - taskIDOption string - timeoutOption time.Duration - deadlineOption time.Time - uniqueOption time.Duration - processAtOption time.Time - processInOption time.Duration - retentionOption time.Duration - groupOption string + retryOption int + queueOption string + taskIDOption string + timeoutOption time.Duration + deadlineOption time.Time + uniqueOption time.Duration + processAtOption time.Time + processInOption time.Duration + retentionOption time.Duration + groupOption string + schedulerEntryIDOption string ) // MaxRetry returns an option to specify the max number of times @@ -150,9 +152,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } // TTL duration must be greater than or equal to 1 second. // // Uniqueness of a task is based on the following properties: -// - Task Type -// - Task Payload -// - Queue Name +// - Task Type +// - Task Payload +// - Queue Name func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } @@ -206,6 +208,18 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st func (name groupOption) Type() OptionType { return GroupOpt } func (name groupOption) Value() interface{} { return string(name) } +// SchedulerEntryIDOpt returns an option to specify the scheduler entry ID. +// This option is only applicable to registered scheduled jobs. +func SchedulerEntryID(id string) Option { + return schedulerEntryIDOption(id) +} + +func (id schedulerEntryIDOption) String() string { + return fmt.Sprintf("SchedulerEntryID(%q)", string(id)) +} +func (id schedulerEntryIDOption) Type() OptionType { return SchedulerEntryIDOpt } +func (id schedulerEntryIDOption) Value() interface{} { return string(id) } + // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. diff --git a/scheduler.go b/scheduler.go index 12c702c..b65efa3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -125,7 +125,7 @@ type SchedulerOpts struct { // enqueueJob encapsulates the job of enqueuing a task and recording the event. type enqueueJob struct { - id uuid.UUID + id string cronspec string task *Task opts []Option @@ -157,7 +157,7 @@ func (j *enqueueJob) Run() { TaskID: info.ID, EnqueuedAt: time.Now().In(j.location), } - err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) + err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event) if err != nil { j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err) } @@ -166,8 +166,9 @@ func (j *enqueueJob) Run() { // Register registers a task to be enqueued on the given schedule specified by the cronspec. // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { + job := &enqueueJob{ - id: uuid.New(), + id: generateEntryID(opts...), cronspec: cronspec, task: task, opts: opts, @@ -184,9 +185,27 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry return "", err } s.mu.Lock() - s.idmap[job.id.String()] = cronID + s.idmap[job.id] = cronID s.mu.Unlock() - return job.id.String(), nil + return job.id, nil +} + +func generateEntryID(opts ...Option) string { + var id string + + if opts != nil { + for _, v := range opts { + if v.Type() == SchedulerEntryIDOpt { + id = v.Value().(string) + } + } + } + + if id == "" { + id = uuid.New().String() + } + + return id } // Unregister removes a registered entry by entry ID. @@ -288,7 +307,7 @@ func (s *Scheduler) beat() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) e := &base.SchedulerEntry{ - ID: job.id.String(), + ID: job.id, Spec: job.cronspec, Type: job.task.Type(), Payload: job.task.Payload(), @@ -315,8 +334,8 @@ func stringifyOptions(opts []Option) []string { func (s *Scheduler) clearHistory() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) - if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { - s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + if err := s.rdb.ClearSchedulerHistory(job.id); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err) } } } diff --git a/scheduler_test.go b/scheduler_test.go index fea048e..9078ed0 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -208,3 +208,70 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) { } postMu.Unlock() } + +func TestSchedulerWithCustomEntryIDGeneratorFunc(t *testing.T) { + tests := []struct { + cronspec string + task *Task + opts []Option + wait time.Duration + queue string + want []*base.TaskMessage + }{ + { + cronspec: "@every 3s", + task: NewTask("task1", nil), + opts: []Option{MaxRetry(10), + SchedulerEntryID("entry1"), + }, + wait: 10 * time.Second, + queue: "default", + want: []*base.TaskMessage{ + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + ID: "entry1", + }, + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + ID: "entry1", + }, + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + ID: "entry1", + }, + }, + }, + } + + r := setup(t) + + for _, tc := range tests { + scheduler := NewScheduler(getRedisConnOpt(t), nil) + if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { + t.Fatal(err) + } + + if err := scheduler.Start(); err != nil { + t.Fatal(err) + } + time.Sleep(tc.wait) + scheduler.Shutdown() + + got := testutil.GetPendingMessages(t, r, tc.queue) + if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" { + t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff) + } + } +}