diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b4f33b..6035bed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Scheduler methods are now thread-safe. It's now safe to call `Register` and `Unregister` concurrently. + ## [0.18.3] - 2020-08-09 ### Changed diff --git a/scheduler.go b/scheduler.go index a85899d..765928f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,6 +19,8 @@ import ( ) // A Scheduler kicks off tasks at regular intervals based on the user defined schedule. +// +// Schedulers are safe for concurrent use by multiple goroutines. type Scheduler struct { id string state *base.ServerState @@ -30,6 +32,9 @@ type Scheduler struct { done chan struct{} wg sync.WaitGroup errHandler func(task *Task, opts []Option, err error) + + // guards idmap + mu sync.Mutex // idmap maps Scheduler's entry ID to cron.EntryID // to avoid using cron.EntryID as the public API of // the Scheduler. @@ -154,17 +159,22 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry if err != nil { return "", err } + s.mu.Lock() s.idmap[job.id.String()] = cronID + s.mu.Unlock() return job.id.String(), nil } // Unregister removes a registered entry by entry ID. // Unregister returns a non-nil error if no entries were found for the given entryID. func (s *Scheduler) Unregister(entryID string) error { + s.mu.Lock() + defer s.mu.Unlock() cronID, ok := s.idmap[entryID] if !ok { return fmt.Errorf("asynq: no scheduler entry found") } + delete(s.idmap, entryID) s.cron.Remove(cronID) return nil }