diff --git a/periodic_task_manager.go b/periodic_task_manager.go new file mode 100644 index 0000000..085ca3d --- /dev/null +++ b/periodic_task_manager.go @@ -0,0 +1,243 @@ +// Copyright 2022 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "crypto/sha256" + "fmt" + "io" + "sort" + "sync" + "time" +) + +// PeriodicTaskManager manages scheduling of periodic tasks. +// It syncs scheduler's entries by calling the config provider periodically. +type PeriodicTaskManager struct { + s *Scheduler + p PeriodicTaskConfigProvider + syncInterval time.Duration + done chan (struct{}) + wg sync.WaitGroup + m map[string]string // map[hash]entryID +} + +type PeriodicTaskManagerOpts struct { + // Required: must be non nil + PeriodicTaskConfigProvider PeriodicTaskConfigProvider + + // Required: must be non nil + RedisConnOpt RedisConnOpt + + // Optional: scheduler options + *SchedulerOpts + + // Optional: default is 3m + SyncInterval time.Duration +} + +const defaultSyncInterval = 3 * time.Minute + +// NewPeriodicTaskManager returns a new PeriodicTaskManager instance. +// The given opts should specify the RedisConnOp and PeriodicTaskConfigProvider at minimum. +func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager, error) { + if opts.PeriodicTaskConfigProvider == nil { + return nil, fmt.Errorf("PeriodicTaskConfigProvider cannot be nil") + } + if opts.RedisConnOpt == nil { + return nil, fmt.Errorf("RedisConnOpt cannot be nil") + } + scheduler := NewScheduler(opts.RedisConnOpt, opts.SchedulerOpts) + syncInterval := opts.SyncInterval + if syncInterval == 0 { + syncInterval = defaultSyncInterval + } + return &PeriodicTaskManager{ + s: scheduler, + p: opts.PeriodicTaskConfigProvider, + syncInterval: syncInterval, + done: make(chan struct{}), + m: make(map[string]string), + }, nil +} + +// PeriodicTaskConfigProvider provides configs for periodic tasks. +// GetConfigs will be called by a PeriodicTaskManager periodically to +// sync the scheduler's entries with the configs returned by the provider. +type PeriodicTaskConfigProvider interface { + GetConfigs() ([]*PeriodicTaskConfig, error) +} + +// PeriodicTaskConfig specifies the details of a periodic task. +type PeriodicTaskConfig struct { + Cronspec string // required: must be non empty string + Task *Task // required: must be non nil + Opts []Option // optional: can be nil +} + +func (c *PeriodicTaskConfig) hash() string { + h := sha256.New() + io.WriteString(h, c.Cronspec) + io.WriteString(h, c.Task.Type()) + h.Write(c.Task.Payload()) + opts := stringifyOptions(c.Opts) + sort.Strings(opts) + for _, opt := range opts { + io.WriteString(h, opt) + } + return fmt.Sprintf("%x", h.Sum(nil)) +} + +func validatePeriodicTaskConfig(c *PeriodicTaskConfig) error { + if c == nil { + return fmt.Errorf("PeriodicTaskConfig cannot be nil") + } + if c.Task == nil { + return fmt.Errorf("PeriodicTaskConfig.Task cannot be nil") + } + if c.Cronspec == "" { + return fmt.Errorf("PeriodicTaskConfig.Cronspec cannot be empty") + } + return nil +} + +// Start starts a scheduler and background goroutine to sync the scheduler with the configs +// returned by the provider. +// +// Start returns any error encountered at start up time. +func (mgr *PeriodicTaskManager) Start() error { + if mgr.s == nil || mgr.p == nil { + panic("asynq: cannot start uninitialized PeriodicTaskManager; use NewPeriodicTaskManager to initialize") + } + if err := mgr.initialSync(); err != nil { + return fmt.Errorf("asynq: %v", err) + } + if err := mgr.s.Start(); err != nil { + return fmt.Errorf("asynq: %v", err) + } + mgr.wg.Add(1) + go func() { + defer mgr.wg.Done() + ticker := time.NewTicker(mgr.syncInterval) + for { + select { + case <-mgr.done: + mgr.s.logger.Debugf("Stopping syncer goroutine") + ticker.Stop() + return + case <-ticker.C: + mgr.sync() + } + } + }() + return nil +} + +// Shutdown gracefully shuts down the manager. +// It notifies a background syncer goroutine to stop and stops scheduler. +func (mgr *PeriodicTaskManager) Shutdown() { + close(mgr.done) + mgr.wg.Wait() + mgr.s.Shutdown() +} + +// Run starts the manager and blocks until an os signal to exit the program is received. +// Once it receives a signal, it gracefully shuts down the manager. +func (mgr *PeriodicTaskManager) Run() error { + if err := mgr.Start(); err != nil { + return err + } + mgr.s.waitForSignals() + mgr.Shutdown() + mgr.s.logger.Debugf("PeriodicTaskManager exiting") + return nil +} + +func (mgr *PeriodicTaskManager) initialSync() error { + configs, err := mgr.p.GetConfigs() + if err != nil { + return fmt.Errorf("initial call to GetConfigs failed: %v", err) + } + for _, c := range configs { + if err := validatePeriodicTaskConfig(c); err != nil { + return fmt.Errorf("initial call to GetConfigs contained an invalid config: %v", err) + } + } + mgr.add(configs) + return nil +} + +func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) { + for _, c := range configs { + entryID, err := mgr.s.Register(c.Cronspec, c.Task, c.Opts...) + if err != nil { + mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q", + c.Cronspec, c.Task.Type()) + continue + } + mgr.m[c.hash()] = entryID + mgr.s.logger.Infof("Successfully registered periodic task: cronspec=%q task=%q, entryID=%s", + c.Cronspec, c.Task.Type(), entryID) + } +} + +func (mgr *PeriodicTaskManager) remove(removed map[string]string) { + for hash, entryID := range removed { + if err := mgr.s.Unregister(entryID); err != nil { + mgr.s.logger.Errorf("Failed to unregister periodic task: %v", err) + continue + } + delete(mgr.m, hash) + mgr.s.logger.Infof("Successfully unregistered periodic task: entryID=%s", entryID) + } +} + +func (mgr *PeriodicTaskManager) sync() { + configs, err := mgr.p.GetConfigs() + if err != nil { + mgr.s.logger.Errorf("Failed to get periodic task configs: %v", err) + return + } + for _, c := range configs { + if err := validatePeriodicTaskConfig(c); err != nil { + mgr.s.logger.Errorf("Failed to sync: GetConfigs returned an invalid config: %v", err) + return + } + } + // Diff and only register/unregister the newly added/removed entries. + removed := mgr.diffRemoved(configs) + added := mgr.diffAdded(configs) + mgr.remove(removed) + mgr.add(added) +} + +// diffRemoved diffs the incoming configs with the registered config and returns +// a map containing hash and entryID of each config that was removed. +func (mgr *PeriodicTaskManager) diffRemoved(configs []*PeriodicTaskConfig) map[string]string { + newConfigs := make(map[string]string) + for _, c := range configs { + newConfigs[c.hash()] = "" // empty value since we don't have entryID yet + } + removed := make(map[string]string) + for k, v := range mgr.m { + // test whether existing config is present in the incoming configs + if _, found := newConfigs[k]; !found { + removed[k] = v + } + } + return removed +} + +// diffAdded diffs the incoming configs with the registered configs and returns +// a list of configs that were added. +func (mgr *PeriodicTaskManager) diffAdded(configs []*PeriodicTaskConfig) []*PeriodicTaskConfig { + var added []*PeriodicTaskConfig + for _, c := range configs { + if _, found := mgr.m[c.hash()]; !found { + added = append(added, c) + } + } + return added +} diff --git a/periodic_task_manager_test.go b/periodic_task_manager_test.go new file mode 100644 index 0000000..24d5e5b --- /dev/null +++ b/periodic_task_manager_test.go @@ -0,0 +1,343 @@ +// Copyright 2022 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +// Trivial implementation of PeriodicTaskConfigProvider for testing purpose. +type FakeConfigProvider struct { + mu sync.Mutex + cfgs []*PeriodicTaskConfig +} + +func (p *FakeConfigProvider) SetConfigs(cfgs []*PeriodicTaskConfig) { + p.mu.Lock() + defer p.mu.Unlock() + p.cfgs = cfgs +} + +func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) { + p.mu.Lock() + defer p.mu.Unlock() + return p.cfgs, nil +} + +func TestNewPeriodicTaskManager(t *testing.T) { + cfgs := []*PeriodicTaskConfig{ + {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + } + tests := []struct { + desc string + opts PeriodicTaskManagerOpts + }{ + { + desc: "with provider and redisConnOpt", + opts: PeriodicTaskManagerOpts{ + RedisConnOpt: RedisClientOpt{Addr: ":6379"}, + PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs}, + }, + }, + { + desc: "with sync option", + opts: PeriodicTaskManagerOpts{ + RedisConnOpt: RedisClientOpt{Addr: ":6379"}, + PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs}, + SyncInterval: 5 * time.Minute, + }, + }, + { + desc: "with scheduler option", + opts: PeriodicTaskManagerOpts{ + RedisConnOpt: RedisClientOpt{Addr: ":6379"}, + PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs}, + SyncInterval: 5 * time.Minute, + SchedulerOpts: &SchedulerOpts{ + LogLevel: DebugLevel, + }, + }, + }, + } + + for _, tc := range tests { + _, err := NewPeriodicTaskManager(tc.opts) + if err != nil { + t.Errorf("%s; NewPeriodicTaskManager returned error: %v", tc.desc, err) + } + } +} + +func TestNewPeriodicTaskManagerError(t *testing.T) { + cfgs := []*PeriodicTaskConfig{ + {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + } + tests := []struct { + desc string + opts PeriodicTaskManagerOpts + }{ + { + desc: "without provider", + opts: PeriodicTaskManagerOpts{ + RedisConnOpt: RedisClientOpt{Addr: ":6379"}, + }, + }, + { + desc: "without redisConOpt", + opts: PeriodicTaskManagerOpts{ + PeriodicTaskConfigProvider: &FakeConfigProvider{cfgs: cfgs}, + }, + }, + } + + for _, tc := range tests { + _, err := NewPeriodicTaskManager(tc.opts) + if err == nil { + t.Errorf("%s; NewPeriodicTaskManager did not return error", tc.desc) + } + } +} + +func TestPeriodicTaskConfigHash(t *testing.T) { + tests := []struct { + desc string + a *PeriodicTaskConfig + b *PeriodicTaskConfig + isSame bool + }{ + { + desc: "basic identity test", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + }, + isSame: true, + }, + { + desc: "with a option", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue")}, + }, + isSame: true, + }, + { + desc: "with multiple options (different order)", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)}, + }, + isSame: true, + }, + { + desc: "with payload", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", []byte("hello world!")), + Opts: []Option{Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", []byte("hello world!")), + Opts: []Option{Queue("myqueue")}, + }, + isSame: true, + }, + { + desc: "with different cronspecs", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + }, + b: &PeriodicTaskConfig{ + Cronspec: "5 * * * *", + Task: NewTask("foo", nil), + }, + isSame: false, + }, + { + desc: "with different task type", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("bar", nil), + }, + isSame: false, + }, + { + desc: "with different options", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Unique(10 * time.Minute)}, + }, + isSame: false, + }, + { + desc: "with different options (one is subset of the other)", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", nil), + Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, + }, + isSame: false, + }, + { + desc: "with different payload", + a: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", []byte("hello!")), + Opts: []Option{Queue("myqueue")}, + }, + b: &PeriodicTaskConfig{ + Cronspec: "* * * * *", + Task: NewTask("foo", []byte("HELLO!")), + Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, + }, + isSame: false, + }, + } + + for _, tc := range tests { + if tc.isSame && tc.a.hash() != tc.b.hash() { + t.Errorf("%s: a.hash=%s b.hash=%s expected to be equal", + tc.desc, tc.a.hash(), tc.b.hash()) + } + if !tc.isSame && tc.a.hash() == tc.b.hash() { + t.Errorf("%s: a.hash=%s b.hash=%s expected to be not equal", + tc.desc, tc.a.hash(), tc.b.hash()) + } + } +} + +// Things to test. +// - Run the manager +// - Change provider to return new configs +// - Verify that the scheduler synced with the new config +func TestPeriodicTaskManager(t *testing.T) { + // Note: In this test, we'll use task type as an ID for each config. + cfgs := []*PeriodicTaskConfig{ + {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task2", nil), Cronspec: "* * * * 2"}, + } + const syncInterval = 3 * time.Second + provider := &FakeConfigProvider{cfgs: cfgs} + mgr, err := NewPeriodicTaskManager(PeriodicTaskManagerOpts{ + RedisConnOpt: getRedisConnOpt(t), + PeriodicTaskConfigProvider: provider, + SyncInterval: syncInterval, + }) + if err != nil { + t.Fatalf("Failed to initialize PeriodicTaskManager: %v", err) + } + + if err := mgr.Start(); err != nil { + t.Fatalf("Failed to start PeriodicTaskManager: %v", err) + } + defer mgr.Shutdown() + + got := extractCronEntries(mgr.s) + want := []*cronEntry{ + {Cronspec: "* * * * 1", TaskType: "task1"}, + {Cronspec: "* * * * 2", TaskType: "task2"}, + } + if diff := cmp.Diff(want, got, sortCronEntry); diff != "" { + t.Errorf("Diff found in scheduler's registered entries: %s", diff) + } + + // Change the underlying configs + // - task2 removed + // - task3 added + provider.SetConfigs([]*PeriodicTaskConfig{ + {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task3", nil), Cronspec: "* * * * 3"}, + }) + + // Wait for the next sync + time.Sleep(syncInterval * 2) + + // Verify the entries are synced + got = extractCronEntries(mgr.s) + want = []*cronEntry{ + {Cronspec: "* * * * 1", TaskType: "task1"}, + {Cronspec: "* * * * 3", TaskType: "task3"}, + } + if diff := cmp.Diff(want, got, sortCronEntry); diff != "" { + t.Errorf("Diff found in scheduler's registered entries: %s", diff) + } + + // Change the underlying configs + // All configs removed, empty set. + provider.SetConfigs([]*PeriodicTaskConfig{}) + + // Wait for the next sync + time.Sleep(syncInterval * 2) + + // Verify the entries are synced + got = extractCronEntries(mgr.s) + want = []*cronEntry{} + if diff := cmp.Diff(want, got, sortCronEntry); diff != "" { + t.Errorf("Diff found in scheduler's registered entries: %s", diff) + } +} + +func extractCronEntries(s *Scheduler) []*cronEntry { + var out []*cronEntry + for _, e := range s.cron.Entries() { + job := e.Job.(*enqueueJob) + out = append(out, &cronEntry{Cronspec: job.cronspec, TaskType: job.task.Type()}) + } + return out +} + +var sortCronEntry = cmp.Transformer("sortCronEntry", func(in []*cronEntry) []*cronEntry { + out := append([]*cronEntry(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].TaskType < out[j].TaskType + }) + return out +}) + +// A simple struct to allow for simpler comparison in test. +type cronEntry struct { + Cronspec string + TaskType string +}