2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

SchedulerEntryID option for registered scheduled job

This commit is contained in:
ilkerkorkut 2023-03-01 01:35:10 +03:00
parent cc777ebdaa
commit 7c65581fd7
No known key found for this signature in database
3 changed files with 121 additions and 21 deletions

View File

@ -49,6 +49,7 @@ const (
TaskIDOpt
RetentionOpt
GroupOpt
SchedulerEntryIDOpt
)
// Option specifies the task processing behavior.
@ -65,16 +66,17 @@ type Option interface {
// Internal option representations.
type (
retryOption int
queueOption string
taskIDOption string
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
groupOption string
retryOption int
queueOption string
taskIDOption string
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
groupOption string
schedulerEntryIDOption string
)
// MaxRetry returns an option to specify the max number of times
@ -150,9 +152,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
@ -206,6 +208,18 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// SchedulerEntryIDOpt returns an option to specify the scheduler entry ID.
// This option is only applicable to registered scheduled jobs.
func SchedulerEntryID(id string) Option {
return schedulerEntryIDOption(id)
}
func (id schedulerEntryIDOption) String() string {
return fmt.Sprintf("SchedulerEntryID(%q)", string(id))
}
func (id schedulerEntryIDOption) Type() OptionType { return SchedulerEntryIDOpt }
func (id schedulerEntryIDOption) Value() interface{} { return string(id) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.

View File

@ -125,7 +125,7 @@ type SchedulerOpts struct {
// enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
id string
cronspec string
task *Task
opts []Option
@ -157,7 +157,7 @@ func (j *enqueueJob) Run() {
TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event)
if err != nil {
j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err)
}
@ -166,8 +166,9 @@ func (j *enqueueJob) Run() {
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
id: generateEntryID(opts...),
cronspec: cronspec,
task: task,
opts: opts,
@ -184,9 +185,27 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
return "", err
}
s.mu.Lock()
s.idmap[job.id.String()] = cronID
s.idmap[job.id] = cronID
s.mu.Unlock()
return job.id.String(), nil
return job.id, nil
}
func generateEntryID(opts ...Option) string {
var id string
if opts != nil {
for _, v := range opts {
if v.Type() == SchedulerEntryIDOpt {
id = v.Value().(string)
}
}
}
if id == "" {
id = uuid.New().String()
}
return id
}
// Unregister removes a registered entry by entry ID.
@ -288,7 +307,7 @@ func (s *Scheduler) beat() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
ID: job.id,
Spec: job.cronspec,
Type: job.task.Type(),
Payload: job.task.Payload(),
@ -315,8 +334,8 @@ func stringifyOptions(opts []Option) []string {
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
if err := s.rdb.ClearSchedulerHistory(job.id); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err)
}
}
}

View File

@ -208,3 +208,70 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) {
}
postMu.Unlock()
}
func TestSchedulerWithCustomEntryIDGeneratorFunc(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
want []*base.TaskMessage
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{MaxRetry(10),
SchedulerEntryID("entry1"),
},
wait: 10 * time.Second,
queue: "default",
want: []*base.TaskMessage{
{
Type: "task1",
Payload: nil,
Retry: 10,
Timeout: int64(defaultTimeout.Seconds()),
Queue: "default",
ID: "entry1",
},
{
Type: "task1",
Payload: nil,
Retry: 10,
Timeout: int64(defaultTimeout.Seconds()),
Queue: "default",
ID: "entry1",
},
{
Type: "task1",
Payload: nil,
Retry: 10,
Timeout: int64(defaultTimeout.Seconds()),
Queue: "default",
ID: "entry1",
},
},
},
}
r := setup(t)
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
scheduler.Shutdown()
got := testutil.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, testutil.IgnoreIDOpt); diff != "" {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
}