mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Update Scheduler API to match Server API
This commit is contained in:
parent
056d787935
commit
55f7267e17
16
scheduler.go
16
scheduler.go
@ -170,17 +170,18 @@ func (s *Scheduler) Unregister(entryID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the scheduler until an os signal to exit the program is received.
|
// Run starts the scheduler until an os signal to exit the program is received.
|
||||||
// It returns an error if scheduler is already running or has been stopped.
|
// It returns an error if scheduler is already running or has been shutdown.
|
||||||
func (s *Scheduler) Run() error {
|
func (s *Scheduler) Run() error {
|
||||||
if err := s.Start(); err != nil {
|
if err := s.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.waitForSignals()
|
s.waitForSignals()
|
||||||
return s.Stop()
|
s.Shutdown()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start starts the scheduler.
|
// Start starts the scheduler.
|
||||||
// It returns an error if the scheduler is already running or has been stopped.
|
// It returns an error if the scheduler is already running or has been shutdown.
|
||||||
func (s *Scheduler) Start() error {
|
func (s *Scheduler) Start() error {
|
||||||
switch s.state.Get() {
|
switch s.state.Get() {
|
||||||
case base.StateActive:
|
case base.StateActive:
|
||||||
@ -197,12 +198,8 @@ func (s *Scheduler) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the scheduler.
|
// Shutdown stops and shuts down the scheduler.
|
||||||
// It returns an error if the scheduler is not currently running.
|
func (s *Scheduler) Shutdown() {
|
||||||
func (s *Scheduler) Stop() error {
|
|
||||||
if s.state.Get() != base.StateActive {
|
|
||||||
return fmt.Errorf("asynq: the scheduler is not running")
|
|
||||||
}
|
|
||||||
s.logger.Info("Scheduler shutting down")
|
s.logger.Info("Scheduler shutting down")
|
||||||
close(s.done) // signal heartbeater to stop
|
close(s.done) // signal heartbeater to stop
|
||||||
ctx := s.cron.Stop()
|
ctx := s.cron.Stop()
|
||||||
@ -214,7 +211,6 @@ func (s *Scheduler) Stop() error {
|
|||||||
s.rdb.Close()
|
s.rdb.Close()
|
||||||
s.state.Set(base.StateClosed)
|
s.state.Set(base.StateClosed)
|
||||||
s.logger.Info("Scheduler stopped")
|
s.logger.Info("Scheduler stopped")
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Scheduler) runHeartbeater() {
|
func (s *Scheduler) runHeartbeater() {
|
||||||
|
@ -67,9 +67,7 @@ func TestSchedulerRegister(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
time.Sleep(tc.wait)
|
time.Sleep(tc.wait)
|
||||||
if err := scheduler.Stop(); err != nil {
|
scheduler.Shutdown()
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
||||||
if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" {
|
if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" {
|
||||||
@ -106,9 +104,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Scheduler should attempt to enqueue the task three times (every 3s).
|
// Scheduler should attempt to enqueue the task three times (every 3s).
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
if err := scheduler.Stop(); err != nil {
|
scheduler.Shutdown()
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
if counter != 3 {
|
if counter != 3 {
|
||||||
@ -150,9 +146,7 @@ func TestSchedulerUnregister(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
time.Sleep(tc.wait)
|
time.Sleep(tc.wait)
|
||||||
if err := scheduler.Stop(); err != nil {
|
scheduler.Shutdown()
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
got := asynqtest.GetPendingMessages(t, r, tc.queue)
|
||||||
if len(got) != 0 {
|
if len(got) != 0 {
|
||||||
|
@ -401,13 +401,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
// ProcessTask should return nil if the processing of a task
|
// ProcessTask should return nil if the processing of a task
|
||||||
// is successful.
|
// is successful.
|
||||||
//
|
//
|
||||||
// If ProcessTask return a non-nil error or panics, the task
|
// If ProcessTask returns a non-nil error or panics, the task
|
||||||
// will be retried after delay if retry-count is remaining,
|
// will be retried after delay if retry-count is remaining,
|
||||||
// otherwise the task will be archived.
|
// otherwise the task will be archived.
|
||||||
//
|
//
|
||||||
// One exception to this rule is when ProcessTask returns SkipRetry error.
|
// One exception to this rule is when ProcessTask returns a SkipRetry error.
|
||||||
// If the returned error is SkipRetry or the error wraps SkipRetry, retry is
|
// If the returned error is SkipRetry or an error wraps SkipRetry, retry is
|
||||||
// skipped and task will be immediately archived instead.
|
// skipped and the task will be immediately archived instead.
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
ProcessTask(context.Context, *Task) error
|
ProcessTask(context.Context, *Task) error
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user