From e19c45cff31fc518e54bb6bbcf263c4bb460b369 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 23 Nov 2019 15:22:43 -0800 Subject: [PATCH] Rename Launcher to Background --- background.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++ launcher.go | 61 --------------------------------------------------- 2 files changed, 61 insertions(+), 61 deletions(-) create mode 100644 background.go delete mode 100644 launcher.go diff --git a/background.go b/background.go new file mode 100644 index 0000000..856039c --- /dev/null +++ b/background.go @@ -0,0 +1,61 @@ +package asynq + +import ( + "sync" + "time" + + "github.com/go-redis/redis/v7" +) + +// Background is a top-level entity for the background-task processing. +type Background struct { + // running indicates whether processor and poller are both running. + running bool + mu sync.Mutex + + poller *poller + processor *processor +} + +// NewBackground returns a new Background instance. +func NewBackground(numWorkers int, opt *RedisOpt) *Background { + client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + rdb := newRDB(client) + poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) + processor := newProcessor(rdb, numWorkers, nil) + return &Background{ + poller: poller, + processor: processor, + } +} + +// TaskHandler handles a given task and reports any error. +type TaskHandler func(*Task) error + +// Start starts the background-task processing. +func (bg *Background) Start(handler TaskHandler) { + bg.mu.Lock() + defer bg.mu.Unlock() + if bg.running { + return + } + bg.running = true + bg.processor.handler = handler + + bg.poller.start() + bg.processor.start() +} + +// Stop stops the background-task processing. +func (bg *Background) Stop() { + bg.mu.Lock() + defer bg.mu.Unlock() + if !bg.running { + return + } + bg.running = false + bg.processor.handler = nil + + bg.poller.terminate() + bg.processor.terminate() +} diff --git a/launcher.go b/launcher.go deleted file mode 100644 index 02e3b7e..0000000 --- a/launcher.go +++ /dev/null @@ -1,61 +0,0 @@ -package asynq - -import ( - "sync" - "time" - - "github.com/go-redis/redis/v7" -) - -// Launcher starts the processor and poller. -type Launcher struct { - // running indicates whether processor and poller are both running. - running bool - mu sync.Mutex - - poller *poller - processor *processor -} - -// NewLauncher creates and returns a new Launcher. -func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { - client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) - rdb := newRDB(client) - poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) - processor := newProcessor(rdb, poolSize, nil) - return &Launcher{ - poller: poller, - processor: processor, - } -} - -// TaskHandler handles a given task and report any error. -type TaskHandler func(*Task) error - -// Start starts the processor and poller. -func (l *Launcher) Start(handler TaskHandler) { - l.mu.Lock() - defer l.mu.Unlock() - if l.running { - return - } - l.running = true - l.processor.handler = handler - - l.poller.start() - l.processor.start() -} - -// Stop stops both processor and poller. -func (l *Launcher) Stop() { - l.mu.Lock() - defer l.mu.Unlock() - if !l.running { - return - } - l.running = false - l.processor.handler = nil - - l.poller.terminate() - l.processor.terminate() -}