diff --git a/launcher.go b/launcher.go index 8d23dbf..00f6f5d 100644 --- a/launcher.go +++ b/launcher.go @@ -7,15 +7,14 @@ import ( "github.com/go-redis/redis/v7" ) -// Launcher starts the manager and poller. +// Launcher starts the processor and poller. type Launcher struct { - // running indicates whether manager and poller are both running. + // running indicates whether processor and poller are both running. running bool mu sync.Mutex poller *poller - - manager *manager + processor *processor } // NewLauncher creates and returns a new Launcher. @@ -23,17 +22,17 @@ func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) rdb := newRDB(client) poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) - manager := newManager(rdb, poolSize, nil) + processor := newProcessor(rdb, poolSize, nil) return &Launcher{ poller: poller, - manager: manager, + processor: processor, } } // TaskHandler handles a given task and report any error. type TaskHandler func(*Task) error -// Start starts the manager and poller. +// Start starts the processor and poller. func (l *Launcher) Start(handler TaskHandler) { l.mu.Lock() defer l.mu.Unlock() @@ -41,13 +40,13 @@ func (l *Launcher) Start(handler TaskHandler) { return } l.running = true - l.manager.handler = handler + l.processor.handler = handler l.poller.start() - l.manager.start() + l.processor.start() } -// Stop stops both manager and poller. +// Stop stops both processor and poller. func (l *Launcher) Stop() { l.mu.Lock() defer l.mu.Unlock() @@ -57,5 +56,5 @@ func (l *Launcher) Stop() { l.running = false l.poller.terminate() - l.manager.terminate() + l.processor.terminate() } diff --git a/manager.go b/processor.go similarity index 72% rename from manager.go rename to processor.go index 1071320..cabf37d 100644 --- a/manager.go +++ b/processor.go @@ -8,7 +8,7 @@ import ( "time" ) -type manager struct { +type processor struct { rdb *rdb handler TaskHandler @@ -17,12 +17,12 @@ type manager struct { // does not exceed the limit sema chan struct{} - // channel to communicate back to the long running "manager" goroutine. + // channel to communicate back to the long running "processor" goroutine. done chan struct{} } -func newManager(rdb *rdb, numWorkers int, handler TaskHandler) *manager { - return &manager{ +func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { + return &processor{ rdb: rdb, handler: handler, sema: make(chan struct{}, numWorkers), @@ -30,40 +30,40 @@ func newManager(rdb *rdb, numWorkers int, handler TaskHandler) *manager { } } -func (m *manager) terminate() { - // send a signal to the manager goroutine to stop +func (p *processor) terminate() { + // send a signal to the processor goroutine to stop // processing tasks from the queue. - m.done <- struct{}{} + p.done <- struct{}{} fmt.Println("--- Waiting for all workers to finish ---") - for i := 0; i < cap(m.sema); i++ { + for i := 0; i < cap(p.sema); i++ { // block until all workers have released the token - m.sema <- struct{}{} + p.sema <- struct{}{} } fmt.Println("--- All workers have finished! ----") } -func (m *manager) start() { +func (p *processor) start() { go func() { for { select { - case <-m.done: - fmt.Println("-------------[Manager]---------------") - fmt.Println("Manager shutting down...") + case <-p.done: + fmt.Println("-------------[Processor]---------------") + fmt.Println("Processor shutting down...") fmt.Println("-------------------------------------") return default: - m.processTasks() + p.exec() } } }() } -func (m *manager) processTasks() { +func (p *processor) exec() { // pull message out of the queue and process it // TODO(hibiken): sort the list of queues in order of priority // NOTE: BLPOP needs to timeout in case a new queue is added. - msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...) + msg, err := p.rdb.bpop(5*time.Second, p.rdb.listQueues()...) if err != nil { switch err { case errQueuePopTimeout: @@ -79,14 +79,14 @@ func (m *manager) processTasks() { } t := &Task{Type: msg.Type, Payload: msg.Payload} - m.sema <- struct{}{} // acquire token + p.sema <- struct{}{} // acquire token go func(task *Task) { - defer func() { <-m.sema }() // release token - err := m.handler(task) + defer func() { <-p.sema }() // release token + err := p.handler(task) if err != nil { if msg.Retried >= msg.Retry { fmt.Println("Retry exhausted!!!") - if err := m.rdb.kill(msg); err != nil { + if err := p.rdb.kill(msg); err != nil { log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) } return @@ -96,7 +96,7 @@ func (m *manager) processTasks() { fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) msg.Retried++ msg.ErrorMsg = err.Error() - if err := m.rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { + if err := p.rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { // TODO(hibiken): Not sure how to handle this error log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) return