From 55f7267e1793608181551464c162c8535f97c00a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 22 Mar 2021 06:46:41 -0700 Subject: [PATCH] Update Scheduler API to match Server API --- scheduler.go | 16 ++++++---------- scheduler_test.go | 12 +++--------- server.go | 8 ++++---- 3 files changed, 13 insertions(+), 23 deletions(-) diff --git a/scheduler.go b/scheduler.go index 41a77e6..b9bc3d4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -170,17 +170,18 @@ func (s *Scheduler) Unregister(entryID string) error { } // Run starts the scheduler until an os signal to exit the program is received. -// It returns an error if scheduler is already running or has been stopped. +// It returns an error if scheduler is already running or has been shutdown. func (s *Scheduler) Run() error { if err := s.Start(); err != nil { return err } s.waitForSignals() - return s.Stop() + s.Shutdown() + return nil } // Start starts the scheduler. -// It returns an error if the scheduler is already running or has been stopped. +// It returns an error if the scheduler is already running or has been shutdown. func (s *Scheduler) Start() error { switch s.state.Get() { case base.StateActive: @@ -197,12 +198,8 @@ func (s *Scheduler) Start() error { return nil } -// Stop stops the scheduler. -// It returns an error if the scheduler is not currently running. -func (s *Scheduler) Stop() error { - if s.state.Get() != base.StateActive { - return fmt.Errorf("asynq: the scheduler is not running") - } +// Shutdown stops and shuts down the scheduler. +func (s *Scheduler) Shutdown() { s.logger.Info("Scheduler shutting down") close(s.done) // signal heartbeater to stop ctx := s.cron.Stop() @@ -214,7 +211,6 @@ func (s *Scheduler) Stop() error { s.rdb.Close() s.state.Set(base.StateClosed) s.logger.Info("Scheduler stopped") - return nil } func (s *Scheduler) runHeartbeater() { diff --git a/scheduler_test.go b/scheduler_test.go index 20339ca..9ee809f 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -67,9 +67,7 @@ func TestSchedulerRegister(t *testing.T) { t.Fatal(err) } time.Sleep(tc.wait) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() got := asynqtest.GetPendingMessages(t, r, tc.queue) if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" { @@ -106,9 +104,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) { } // Scheduler should attempt to enqueue the task three times (every 3s). time.Sleep(10 * time.Second) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() mu.Lock() if counter != 3 { @@ -150,9 +146,7 @@ func TestSchedulerUnregister(t *testing.T) { t.Fatal(err) } time.Sleep(tc.wait) - if err := scheduler.Stop(); err != nil { - t.Fatal(err) - } + scheduler.Shutdown() got := asynqtest.GetPendingMessages(t, r, tc.queue) if len(got) != 0 { diff --git a/server.go b/server.go index 71d5f46..41cda4b 100644 --- a/server.go +++ b/server.go @@ -401,13 +401,13 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { // ProcessTask should return nil if the processing of a task // is successful. // -// If ProcessTask return a non-nil error or panics, the task +// If ProcessTask returns a non-nil error or panics, the task // will be retried after delay if retry-count is remaining, // otherwise the task will be archived. // -// One exception to this rule is when ProcessTask returns SkipRetry error. -// If the returned error is SkipRetry or the error wraps SkipRetry, retry is -// skipped and task will be immediately archived instead. +// One exception to this rule is when ProcessTask returns a SkipRetry error. +// If the returned error is SkipRetry or an error wraps SkipRetry, retry is +// skipped and the task will be immediately archived instead. type Handler interface { ProcessTask(context.Context, *Task) error }