2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add Unregister method to Scheduler

This commit is contained in:
Ken Hibino 2020-11-07 10:41:27 -08:00
parent f618f5b1f5
commit de28c1ea19
3 changed files with 69 additions and 3 deletions

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- `Unregister` method is added to `Scheduler` to remove a registered entry.
## [0.15.0] - 2021-01-31 ## [0.15.0] - 2021-01-31
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq" **IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
@ -15,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq". - `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer, - `RedisConnOpt` interface has changed slightly. If you have been passing `RedisClientOpt`, `RedisFailoverClientOpt`, or `RedisClusterClientOpt` as a pointer,
update your code to pass as a value. update your code to pass as a value.
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added ### Added

View File

@ -30,6 +30,10 @@ type Scheduler struct {
done chan struct{} done chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error) errHandler func(task *Task, opts []Option, err error)
// idmap maps Scheduler's entry ID to cron.EntryID
// to avoid using cron.EntryID as the public API of
// the Scheduler.
idmap map[string]cron.EntryID
} }
// NewScheduler returns a new Scheduler instance given the redis connection option. // NewScheduler returns a new Scheduler instance given the redis connection option.
@ -65,6 +69,7 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
location: loc, location: loc,
done: make(chan struct{}), done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler, errHandler: opts.EnqueueErrorHandler,
idmap: make(map[string]cron.EntryID),
} }
} }
@ -145,12 +150,25 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
logger: s.logger, logger: s.logger,
errHandler: s.errHandler, errHandler: s.errHandler,
} }
if _, err = s.cron.AddJob(cronspec, job); err != nil { cronID, err := s.cron.AddJob(cronspec, job)
if err != nil {
return "", err return "", err
} }
s.idmap[job.id.String()] = cronID
return job.id.String(), nil return job.id.String(), nil
} }
// Unregister removes a registered entry by entry ID.
// Unregister returns a non-nil error if no entries were found for the given entryID.
func (s *Scheduler) Unregister(entryID string) error {
cronID, ok := s.idmap[entryID]
if !ok {
return fmt.Errorf("asynq: no scheduler entry found")
}
s.cron.Remove(cronID)
return nil
}
// Run starts the scheduler until an os signal to exit the program is received. // Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped. // It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error { func (s *Scheduler) Run() error {

View File

@ -14,7 +14,7 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
func TestScheduler(t *testing.T) { func TestSchedulerRegister(t *testing.T) {
tests := []struct { tests := []struct {
cronspec string cronspec string
task *Task task *Task
@ -116,3 +116,47 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
} }
mu.Unlock() mu.Unlock()
} }
func TestSchedulerUnregister(t *testing.T) {
tests := []struct {
cronspec string
task *Task
opts []Option
wait time.Duration
queue string
}{
{
cronspec: "@every 3s",
task: NewTask("task1", nil),
opts: []Option{MaxRetry(10)},
wait: 10 * time.Second,
queue: "default",
},
}
r := setup(t)
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
entryID, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
if err := scheduler.Unregister(entryID); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue)
if len(got) != 0 {
t.Errorf("%d tasks were enqueued, want zero", len(got))
}
}
}