From 0db4b8a34f97ac8fefba3c91dbb75ed6244aba18 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 27 Nov 2019 14:26:04 -0800 Subject: [PATCH] fix:Remove task from in_progress when worker goroutine finishes There was a bug related to the logic of removing task from "asynq:in_progress" list when worker is finished. The task was not properly removed from the list when the taskMessage gets modified before the deferred call. More specifically when task was scheduled for retry the message was modified and therefore LREM could not properly removed the message from "asynq:in_progress" list. --- processor.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/processor.go b/processor.go index 2ec4745..e7d41e0 100644 --- a/processor.go +++ b/processor.go @@ -82,12 +82,14 @@ func (p *processor) exec() { task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token go func(task *Task) { - defer func() { - if err := p.rdb.lrem(inProgress, msg); err != nil { + // NOTE: This deferred anonymous function needs to take taskMessage as a value because + // the message can be mutated by the time this function is called. + defer func(msg taskMessage) { + if err := p.rdb.lrem(inProgress, &msg); err != nil { log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) } <-p.sema // release token - }() + }(*msg) err := perform(p.handler, task) if err != nil { retryTask(p.rdb, msg, err)