2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 15:52:18 +08:00

Minor fixes

This commit is contained in:
Ken Hibino 2019-11-21 20:22:55 -08:00
parent 66930970f9
commit c84287d7ab
3 changed files with 7 additions and 3 deletions

View File

@ -54,6 +54,7 @@ func (l *Launcher) Stop() {
return return
} }
l.running = false l.running = false
l.processor.handler = nil
l.poller.terminate() l.poller.terminate()
l.processor.terminate() l.processor.terminate()

View File

@ -36,6 +36,7 @@ func (p *poller) terminate() {
p.done <- struct{}{} p.done <- struct{}{}
} }
// start starts the "poller" goroutine.
func (p *poller) start() { func (p *poller) start() {
go func() { go func() {
for { for {
@ -65,6 +66,7 @@ func (p *poller) exec() {
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset) fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset)
for _, m := range msgs { for _, m := range msgs {
// TODO(hibiken): Make this move operation atomic.
if err := p.rdb.move(zset, m); err != nil { if err := p.rdb.move(zset, m); err != nil {
log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err) log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err)
continue continue

View File

@ -29,7 +29,7 @@ func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor {
} }
func (p *processor) terminate() { func (p *processor) terminate() {
// Signal he processor goroutine to stop processing tasks from the queue. // Signal the processor goroutine to stop processing tasks from the queue.
p.done <- struct{}{} p.done <- struct{}{}
fmt.Println("--- Waiting for all workers to finish ---") fmt.Println("--- Waiting for all workers to finish ---")
@ -57,10 +57,11 @@ func (p *processor) start() {
} }
func (p *processor) exec() { func (p *processor) exec() {
// pull message out of the queue and process it const timeout = 5 * time.Second
// pull a task out of the queue and process it
// TODO(hibiken): sort the list of queues in order of priority // TODO(hibiken): sort the list of queues in order of priority
// NOTE: BLPOP needs to timeout in case a new queue is added. // NOTE: BLPOP needs to timeout in case a new queue is added.
msg, err := p.rdb.bpop(5*time.Second, p.rdb.listQueues()...) msg, err := p.rdb.bpop(timeout, p.rdb.listQueues()...)
if err != nil { if err != nil {
switch err { switch err {
case errQueuePopTimeout: case errQueuePopTimeout: