Fix requeue logic in processor

This commit is contained in:
Ken Hibino
2020-06-13 06:09:54 -07:00
parent ade6e61f51
commit 7f30fa2bb6
5 changed files with 4 additions and 145 deletions

View File

@@ -137,13 +137,9 @@ func (p *processor) terminate() {
p.sema <- struct{}{}
}
p.logger.Info("All workers have finished")
p.restore() // move any unfinished tasks back to the queue.
}
func (p *processor) start(wg *sync.WaitGroup) {
// NOTE: The call to "restore" needs to complete before starting
// the processor goroutine.
p.restore()
wg.Add(1)
go func() {
defer wg.Done()
@@ -206,8 +202,9 @@ func (p *processor) exec() {
select {
case <-p.quit:
// time is up, quit this worker goroutine.
// time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(msg)
return
case resErr := <-resCh:
// Note: One of three things should happen.
@@ -231,22 +228,12 @@ func (p *processor) exec() {
}
}
// restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks.
func (p *processor) restore() {
n, err := p.broker.RequeueAll()
if err != nil {
p.logger.Errorf("Could not restore unfinished tasks: %v", err)
}
if n > 0 {
p.logger.Infof("Restored %d unfinished tasks back to queue", n)
}
}
func (p *processor) requeue(msg *base.TaskMessage) {
err := p.broker.Requeue(msg)
if err != nil {
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
} else {
p.logger.Infof("Pushed task id=%s back to queue", msg.ID)
}
}