2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Make scheduler methods thread-safe

This commit is contained in:
Ken Hibino 2021-08-16 20:22:43 -07:00
parent c197902dc0
commit cfd1a1dfe8
2 changed files with 14 additions and 0 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Fixed
- Scheduler methods are now thread-safe. It's now safe to call `Register` and `Unregister` concurrently.
## [0.18.3] - 2020-08-09 ## [0.18.3] - 2020-08-09
### Changed ### Changed

View File

@ -19,6 +19,8 @@ import (
) )
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule. // A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
//
// Schedulers are safe for concurrent use by multiple goroutines.
type Scheduler struct { type Scheduler struct {
id string id string
state *base.ServerState state *base.ServerState
@ -30,6 +32,9 @@ type Scheduler struct {
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error) errHandler func(task *Task, opts []Option, err error)
// guards idmap
mu sync.Mutex
// idmap maps Scheduler's entry ID to cron.EntryID // idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of // to avoid using cron.EntryID as the public API of
// the Scheduler. // the Scheduler.
@ -154,17 +159,22 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
if err != nil { if err != nil {
return "", err return "", err
} }
s.mu.Lock()
s.idmap[job.id.String()] = cronID s.idmap[job.id.String()] = cronID
s.mu.Unlock()
return job.id.String(), nil return job.id.String(), nil
} }
// Unregister removes a registered entry by entry ID. // Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID. // Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error { func (s *Scheduler) Unregister(entryID string) error {
s.mu.Lock()
defer s.mu.Unlock()
cronID, ok := s.idmap[entryID] cronID, ok := s.idmap[entryID]
if !ok { if !ok {
return fmt.Errorf("asynq: no scheduler entry found") return fmt.Errorf("asynq: no scheduler entry found")
} }
delete(s.idmap, entryID)
s.cron.Remove(cronID) s.cron.Remove(cronID)
return nil return nil
} }