mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 08:40:22 +08:00
Add Unregister method to Scheduler
This commit is contained in:
parent
942345ee80
commit
e58f2d27b8
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- `Unregister` method is added to `Scheduler` to remove a registered entry.
|
||||||
|
|
||||||
## [0.13.0] - 2020-10-13
|
## [0.13.0] - 2020-10-13
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
20
scheduler.go
20
scheduler.go
@ -29,6 +29,10 @@ type Scheduler struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
errHandler func(task *Task, opts []Option, err error)
|
errHandler func(task *Task, opts []Option, err error)
|
||||||
|
// idMap maps Scheduler's entry ID to cron.EntryID
|
||||||
|
// to avoid using cron.EntryID as the public API of
|
||||||
|
// the Scheduler.
|
||||||
|
idMap map[string]cron.EntryID
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||||
@ -60,6 +64,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
|||||||
location: loc,
|
location: loc,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
errHandler: opts.EnqueueErrorHandler,
|
errHandler: opts.EnqueueErrorHandler,
|
||||||
|
idMap: make(map[string]cron.EntryID),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -140,12 +145,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
|
|||||||
logger: s.logger,
|
logger: s.logger,
|
||||||
errHandler: s.errHandler,
|
errHandler: s.errHandler,
|
||||||
}
|
}
|
||||||
if _, err = s.cron.AddJob(cronspec, job); err != nil {
|
cronID, err := s.cron.AddJob(cronspec, job)
|
||||||
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
s.idMap[job.id.String()] = cronID
|
||||||
return job.id.String(), nil
|
return job.id.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unregister removes a registered entry by entry ID.
|
||||||
|
// Unregister returns a non-nil error if no entries were found for the given entryID.
|
||||||
|
func (s *Scheduler) Unregister(entryID string) error {
|
||||||
|
cronID, ok := s.idMap[entryID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("asynq: no entry found for entryID: %q", entryID)
|
||||||
|
}
|
||||||
|
s.cron.Remove(cronID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Run starts the scheduler until an os signal to exit the program is received.
|
// Run starts the scheduler until an os signal to exit the program is received.
|
||||||
// It returns an error if scheduler is already running or has been stopped.
|
// It returns an error if scheduler is already running or has been stopped.
|
||||||
func (s *Scheduler) Run() error {
|
func (s *Scheduler) Run() error {
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestScheduler(t *testing.T) {
|
func TestSchedulerRegister(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
cronspec string
|
cronspec string
|
||||||
task *Task
|
task *Task
|
||||||
@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
|
|||||||
}
|
}
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchedulerUnregister(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
cronspec string
|
||||||
|
task *Task
|
||||||
|
opts []Option
|
||||||
|
wait time.Duration
|
||||||
|
queue string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
cronspec: "@every 3s",
|
||||||
|
task: NewTask("task1", nil),
|
||||||
|
opts: []Option{MaxRetry(10)},
|
||||||
|
wait: 10 * time.Second,
|
||||||
|
queue: "default",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := setup(t)
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
scheduler := NewScheduler(getRedisConnOpt(t), nil)
|
||||||
|
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := scheduler.Unregister(entryID); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := scheduler.Start(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(tc.wait)
|
||||||
|
if err := scheduler.Stop(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
||||||
|
if len(got) != 0 {
|
||||||
|
t.Errorf("%d tasks were enqueued, want zero", len(got))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user