diff --git a/background.go b/background.go index 4571af1..6c87d0d 100644 --- a/background.go +++ b/background.go @@ -28,7 +28,7 @@ type Background struct { running bool rdb *rdb.RDB - poller *poller + scheduler *scheduler processor *processor } @@ -36,11 +36,11 @@ type Background struct { // given a redis configuration . func NewBackground(numWorkers int, cfg *RedisConfig) *Background { r := rdb.NewRDB(newRedisClient(cfg)) - poller := newPoller(r, 5*time.Second) + scheduler := newScheduler(r, 5*time.Second) processor := newProcessor(r, numWorkers, nil) return &Background{ rdb: r, - poller: poller, + scheduler: scheduler, processor: processor, } } @@ -101,7 +101,7 @@ func (bg *Background) start(handler Handler) { bg.running = true bg.processor.handler = handler - bg.poller.start() + bg.scheduler.start() bg.processor.start() } @@ -113,7 +113,7 @@ func (bg *Background) stop() { return } - bg.poller.terminate() + bg.scheduler.terminate() bg.processor.terminate() bg.rdb.Close() diff --git a/poller.go b/poller.go deleted file mode 100644 index 9fdd8b6..0000000 --- a/poller.go +++ /dev/null @@ -1,53 +0,0 @@ -package asynq - -import ( - "log" - "time" - - "github.com/hibiken/asynq/internal/rdb" -) - -type poller struct { - rdb *rdb.RDB - - // channel to communicate back to the long running "poller" goroutine. - done chan struct{} - - // poll interval on average - avgInterval time.Duration -} - -func newPoller(r *rdb.RDB, avgInterval time.Duration) *poller { - return &poller{ - rdb: r, - done: make(chan struct{}), - avgInterval: avgInterval, - } -} - -func (p *poller) terminate() { - log.Println("[INFO] Poller shutting down...") - // Signal the poller goroutine to stop polling. - p.done <- struct{}{} -} - -// start starts the "poller" goroutine. -func (p *poller) start() { - go func() { - for { - select { - case <-p.done: - log.Println("[INFO] Poller done.") - return - case <-time.After(p.avgInterval): - p.exec() - } - } - }() -} - -func (p *poller) exec() { - if err := p.rdb.CheckAndEnqueue(); err != nil { - log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) - } -} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..c88d862 --- /dev/null +++ b/scheduler.go @@ -0,0 +1,53 @@ +package asynq + +import ( + "log" + "time" + + "github.com/hibiken/asynq/internal/rdb" +) + +type scheduler struct { + rdb *rdb.RDB + + // channel to communicate back to the long running "scheduler" goroutine. + done chan struct{} + + // poll interval on average + avgInterval time.Duration +} + +func newScheduler(r *rdb.RDB, avgInterval time.Duration) *scheduler { + return &scheduler{ + rdb: r, + done: make(chan struct{}), + avgInterval: avgInterval, + } +} + +func (s *scheduler) terminate() { + log.Println("[INFO] Scheduler shutting down...") + // Signal the scheduler goroutine to stop polling. + s.done <- struct{}{} +} + +// start starts the "scheduler" goroutine. +func (s *scheduler) start() { + go func() { + for { + select { + case <-s.done: + log.Println("[INFO] Scheduler done.") + return + case <-time.After(s.avgInterval): + s.exec() + } + } + }() +} + +func (s *scheduler) exec() { + if err := s.rdb.CheckAndEnqueue(); err != nil { + log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) + } +} diff --git a/poller_test.go b/scheduler_test.go similarity index 88% rename from poller_test.go rename to scheduler_test.go index 6da3c33..a8de200 100644 --- a/poller_test.go +++ b/scheduler_test.go @@ -10,7 +10,7 @@ import ( "github.com/hibiken/asynq/internal/rdb" ) -func TestPoller(t *testing.T) { +func TestScheduler(t *testing.T) { type scheduledTask struct { msg *base.TaskMessage processAt time.Time @@ -18,7 +18,7 @@ func TestPoller(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - p := newPoller(rdbClient, pollInterval) + s := newScheduler(rdbClient, pollInterval) t1 := randomTask("gen_thumbnail", "default", nil) t2 := randomTask("send_email", "default", nil) t3 := randomTask("reindex", "default", nil) @@ -92,26 +92,26 @@ func TestPoller(t *testing.T) { } } - p.start() + s.start() time.Sleep(tc.wait) - p.terminate() + s.terminate() gotScheduledRaw := r.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.ScheduledQueue, diff) + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledQueue, diff) } gotRetryRaw := r.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.RetryQueue, diff) + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryQueue, diff) } gotQueueRaw := r.LRange(base.DefaultQueue, 0, -1).Val() gotQueue := mustUnmarshalSlice(t, gotQueueRaw) if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.DefaultQueue, diff) + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultQueue, diff) } } }