2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-18 23:00:20 +08:00

Merge f75581d4e27b7c5d5ac7d0c12e6bff5a12abeffe into c327bc40a28e4db45195cfe082d88faa808ce87d

This commit is contained in:
ilkerkorkut 2025-04-11 16:31:43 +08:00 committed by GitHub
commit 085f9d836a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 98 additions and 18 deletions

View File

@ -60,6 +60,7 @@ const (
TaskIDOpt
RetentionOpt
GroupOpt
SchedulerEntryIDOpt
)
// Option specifies the task processing behavior.
@ -76,16 +77,17 @@ type Option interface {
// Internal option representations.
type (
retryOption int
queueOption string
taskIDOption string
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
groupOption string
retryOption int
queueOption string
taskIDOption string
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
groupOption string
schedulerEntryIDOption string
)
// MaxRetry returns an option to specify the max number of times
@ -217,6 +219,18 @@ func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", st
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// SchedulerEntryIDOpt returns an option to specify the scheduler entry ID.
// This option is only applicable to registered scheduled jobs.
func SchedulerEntryID(id string) Option {
return schedulerEntryIDOption(id)
}
func (id schedulerEntryIDOption) String() string {
return fmt.Sprintf("SchedulerEntryID(%q)", string(id))
}
func (id schedulerEntryIDOption) Type() OptionType { return SchedulerEntryIDOpt }
func (id schedulerEntryIDOption) Value() interface{} { return string(id) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.

View File

@ -165,7 +165,7 @@ type SchedulerOpts struct {
// enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
id string
cronspec string
task *Task
opts []Option
@ -197,7 +197,7 @@ func (j *enqueueJob) Run() {
TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event)
if err != nil {
j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err)
}
@ -206,8 +206,9 @@ func (j *enqueueJob) Run() {
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
id: generateEntryID(opts...),
cronspec: cronspec,
task: task,
opts: opts,
@ -224,9 +225,27 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
return "", err
}
s.mu.Lock()
s.idmap[job.id.String()] = cronID
s.idmap[job.id] = cronID
s.mu.Unlock()
return job.id.String(), nil
return job.id, nil
}
func generateEntryID(opts ...Option) string {
var id string
if opts != nil {
for _, v := range opts {
if v.Type() == SchedulerEntryIDOpt {
id = v.Value().(string)
}
}
}
if id == "" {
id = uuid.New().String()
}
return id
}
// Unregister removes a registered entry by entry ID.
@ -331,7 +350,7 @@ func (s *Scheduler) beat() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
ID: job.id,
Spec: job.cronspec,
Type: job.task.Type(),
Payload: job.task.Payload(),
@ -357,8 +376,8 @@ func stringifyOptions(opts []Option) []string {
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
if err := s.rdb.ClearSchedulerHistory(job.id); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err)
}
}
}

View File

@ -232,3 +232,50 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) {
}
postMu.Unlock()
}
func TestSchedulerWithCustomEntryIDOpt(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
want []*base.TaskMessage
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{
MaxRetry(10),
SchedulerEntryID("entry1"),
},
wait: 10 * time.Second,
queue: "default",
want: []*base.TaskMessage{
{
Type: "task1",
Payload: nil,
Retry: 10,
Timeout: int64(defaultTimeout.Seconds()),
Queue: "default",
ID: "entry1",
},
},
},
}
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
scheduler.Shutdown()
if entryID != "entry1" {
t.Errorf("entryID = %q, want %q", entryID, "entry1")
}
}
}