2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

PeriodicTask 注册任务的时候主动带上id,方便排查和更新数据从db中

This commit is contained in:
xiaohan 2024-12-10 22:36:21 +08:00
parent 02907551b4
commit 1e87f20b06
4 changed files with 31 additions and 30 deletions

View File

@ -86,10 +86,10 @@ func ExampleScheduler() {
&asynq.SchedulerOpts{Location: time.Local}, &asynq.SchedulerOpts{Location: time.Local},
) )
if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { if _, err := scheduler.Register("1", "* * * * *", asynq.NewTask("task1", nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { if _, err := scheduler.Register("2", "@every 30s", asynq.NewTask("task2", nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -22,7 +22,7 @@ type PeriodicTaskManager struct {
syncInterval time.Duration syncInterval time.Duration
done chan (struct{}) done chan (struct{})
wg sync.WaitGroup wg sync.WaitGroup
m map[string]string // map[hash]entryID m map[string]struct{} // map[hash]entryID
} }
type PeriodicTaskManagerOpts struct { type PeriodicTaskManagerOpts struct {
@ -69,7 +69,7 @@ func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager,
p: opts.PeriodicTaskConfigProvider, p: opts.PeriodicTaskConfigProvider,
syncInterval: syncInterval, syncInterval: syncInterval,
done: make(chan struct{}), done: make(chan struct{}),
m: make(map[string]string), m: make(map[string]struct{}),
}, nil }, nil
} }
@ -82,6 +82,7 @@ type PeriodicTaskConfigProvider interface {
// PeriodicTaskConfig specifies the details of a periodic task. // PeriodicTaskConfig specifies the details of a periodic task.
type PeriodicTaskConfig struct { type PeriodicTaskConfig struct {
ID string
Cronspec string // required: must be non empty string Cronspec string // required: must be non empty string
Task *Task // required: must be non nil Task *Task // required: must be non nil
Opts []Option // optional: can be nil Opts []Option // optional: can be nil
@ -181,25 +182,25 @@ func (mgr *PeriodicTaskManager) initialSync() error {
func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) { func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) {
for _, c := range configs { for _, c := range configs {
entryID, err := mgr.s.Register(c.Cronspec, c.Task, c.Opts...) entryID, err := mgr.s.Register(c.ID, c.Cronspec, c.Task, c.Opts...)
if err != nil { if err != nil {
mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q err=%v", mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q err=%v",
c.Cronspec, c.Task.Type(), err) c.Cronspec, c.Task.Type(), err)
continue continue
} }
mgr.m[c.hash()] = entryID mgr.m[entryID] = struct{}{} // ? m[string]struct{}
mgr.s.logger.Infof("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s", mgr.s.logger.Infof("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s",
c.Cronspec, c.Task.Type(), entryID) c.Cronspec, c.Task.Type(), entryID)
} }
} }
func (mgr *PeriodicTaskManager) remove(removed map[string]string) { func (mgr *PeriodicTaskManager) remove(removed map[string]struct{}) {
for hash, entryID := range removed { for entryID, _ := range removed {
if err := mgr.s.Unregister(entryID); err != nil { if err := mgr.s.Unregister(entryID); err != nil {
mgr.s.logger.Errorf("Failed to unregister periodic task: %v", err) mgr.s.logger.Errorf("Failed to unregister periodic task: %v", err)
continue continue
} }
delete(mgr.m, hash) delete(mgr.m, entryID)
mgr.s.logger.Infof("Successfully unregistered periodic task: entryID=%s", entryID) mgr.s.logger.Infof("Successfully unregistered periodic task: entryID=%s", entryID)
} }
} }
@ -225,16 +226,16 @@ func (mgr *PeriodicTaskManager) sync() {
// diffRemoved diffs the incoming configs with the registered config and returns // diffRemoved diffs the incoming configs with the registered config and returns
// a map containing hash and entryID of each config that was removed. // a map containing hash and entryID of each config that was removed.
func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[string]string { func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[string]struct{} {
newConfigs := make(map[string]string) newConfigs := make(map[string]struct{})
for _, c := range configs { for _, c := range configs {
newConfigs[c.hash()] = "" // empty value since we don't have entryID yet newConfigs[c.ID] = struct{}{} // empty value since we don't have entryID yet
} }
removed := make(map[string]string) removed := make(map[string]struct{})
for k, v := range mgr.m { for k, _ := range mgr.m {
// test whether existing config is present in the incoming configs // test whether existing config is present in the incoming configs
if _, found := newConfigs[k]; !found { if _, found := newConfigs[k]; !found {
removed[k] = v removed[k] = struct{}{}
} }
} }
return removed return removed
@ -245,7 +246,7 @@ func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[s
func (mgr *PeriodicTaskManager) diffAdded(configs []*PeriodicTaskConfig) []*PeriodicTaskConfig { func (mgr *PeriodicTaskManager) diffAdded(configs []*PeriodicTaskConfig) []*PeriodicTaskConfig {
var added []*PeriodicTaskConfig var added []*PeriodicTaskConfig
for _, c := range configs { for _, c := range configs {
if _, found := mgr.m[c.hash()]; !found { if _, found := mgr.m[c.ID]; !found {
added = append(added, c) added = append(added, c)
} }
} }

View File

@ -165,7 +165,7 @@ type SchedulerOpts struct {
// enqueueJob encapsulates the job of enqueuing a task and recording the event. // enqueueJob encapsulates the job of enqueuing a task and recording the event.
type enqueueJob struct { type enqueueJob struct {
id uuid.UUID id string
cronspec string cronspec string
task *Task task *Task
opts []Option opts []Option
@ -197,7 +197,7 @@ func (j *enqueueJob) Run() {
TaskID: info.ID, TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location), EnqueuedAt: time.Now().In(j.location),
} }
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event)
if err != nil { if err != nil {
j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err) j.logger.Warnf("scheduler could not record enqueue event of enqueued task %s: %v", info.ID, err)
} }
@ -205,9 +205,9 @@ func (j *enqueueJob) Run() {
// Register registers a task to be enqueued on the given schedule specified by the cronspec. // Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry. // It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { func (s *Scheduler) Register(ID, cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{ job := &enqueueJob{
id: uuid.New(), id: ID,
cronspec: cronspec, cronspec: cronspec,
task: task, task: task,
opts: opts, opts: opts,
@ -224,9 +224,9 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
return "", err return "", err
} }
s.mu.Lock() s.mu.Lock()
s.idmap[job.id.String()] = cronID s.idmap[job.id] = cronID
s.mu.Unlock() s.mu.Unlock()
return job.id.String(), nil return job.id, nil
} }
// Unregister removes a registered entry by entry ID. // Unregister removes a registered entry by entry ID.
@ -331,7 +331,7 @@ func (s *Scheduler) beat() {
for _, entry := range s.cron.Entries() { for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob) job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{ e := &base.SchedulerEntry{
ID: job.id.String(), ID: job.id,
Spec: job.cronspec, Spec: job.cronspec,
Type: job.task.Type(), Type: job.task.Type(),
Payload: job.task.Payload(), Payload: job.task.Payload(),
@ -357,8 +357,8 @@ func stringifyOptions(opts []Option) []string {
func (s *Scheduler) clearHistory() { func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() { for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob) job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { if err := s.rdb.ClearSchedulerHistory(job.id); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err)
} }
} }
} }

View File

@ -62,7 +62,7 @@ func TestSchedulerRegister(t *testing.T) {
// Tests for new redis connection. // Tests for new redis connection.
for _, tc := range tests { for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil) scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { if _, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -84,7 +84,7 @@ func TestSchedulerRegister(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient) redisClient := getRedisConnOpt(t).MakeRedisClient().(redis.UniversalClient)
scheduler := NewSchedulerFromRedisClient(redisClient, nil) scheduler := NewSchedulerFromRedisClient(redisClient, nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { if _, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -120,7 +120,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
task := NewTask("test", nil) task := NewTask("test", nil)
if _, err := scheduler.Register("@every 3s", task); err != nil { if _, err := scheduler.Register("1", "@every 3s", task); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -159,7 +159,7 @@ func TestSchedulerUnregister(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil) scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...) entryID, err := scheduler.Register("1", tc.cronspec, tc.task, tc.opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -209,7 +209,7 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) {
task := NewTask("test", nil) task := NewTask("test", nil)
if _, err := scheduler.Register("@every 3s", task); err != nil { if _, err := scheduler.Register("1", "@every 3s", task); err != nil {
t.Fatal(err) t.Fatal(err)
} }