diff --git a/launcher.go b/launcher.go index 146982d..02e3b7e 100644 --- a/launcher.go +++ b/launcher.go @@ -54,6 +54,7 @@ func (l *Launcher) Stop() { return } l.running = false + l.processor.handler = nil l.poller.terminate() l.processor.terminate() diff --git a/poller.go b/poller.go index 7569278..3d09eb0 100644 --- a/poller.go +++ b/poller.go @@ -36,6 +36,7 @@ func (p *poller) terminate() { p.done <- struct{}{} } +// start starts the "poller" goroutine. func (p *poller) start() { go func() { for { @@ -65,6 +66,7 @@ func (p *poller) exec() { fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset) for _, m := range msgs { + // TODO(hibiken): Make this move operation atomic. if err := p.rdb.move(zset, m); err != nil { log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err) continue diff --git a/processor.go b/processor.go index aba4968..e2330a0 100644 --- a/processor.go +++ b/processor.go @@ -29,7 +29,7 @@ func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { } func (p *processor) terminate() { - // Signal he processor goroutine to stop processing tasks from the queue. + // Signal the processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} fmt.Println("--- Waiting for all workers to finish ---") @@ -57,10 +57,11 @@ func (p *processor) start() { } func (p *processor) exec() { - // pull message out of the queue and process it + const timeout = 5 * time.Second + // pull a task out of the queue and process it // TODO(hibiken): sort the list of queues in order of priority // NOTE: BLPOP needs to timeout in case a new queue is added. - msg, err := p.rdb.bpop(5*time.Second, p.rdb.listQueues()...) + msg, err := p.rdb.bpop(timeout, p.rdb.listQueues()...) if err != nil { switch err { case errQueuePopTimeout: