diff --git a/processor.go b/processor.go index 2ec4745..e7d41e0 100644 --- a/processor.go +++ b/processor.go @@ -82,12 +82,14 @@ func (p *processor) exec() { task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token go func(task *Task) { - defer func() { - if err := p.rdb.lrem(inProgress, msg); err != nil { + // NOTE: This deferred anonymous function needs to take taskMessage as a value because + // the message can be mutated by the time this function is called. + defer func(msg taskMessage) { + if err := p.rdb.lrem(inProgress, &msg); err != nil { log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) } <-p.sema // release token - }() + }(*msg) err := perform(p.handler, task) if err != nil { retryTask(p.rdb, msg, err)