2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

fix:Remove task from in_progress when worker goroutine finishes

There was a bug related to the logic of removing task from
"asynq:in_progress" list when worker is finished. The task was not
properly removed from the list when the taskMessage gets modified before
the deferred call. More specifically when task was scheduled for retry
the message was modified and therefore LREM could not properly removed
the message from "asynq:in_progress" list.
This commit is contained in:
Ken Hibino 2019-11-27 14:26:04 -08:00
parent 871162cd67
commit 0db4b8a34f

View File

@ -82,12 +82,14 @@ func (p *processor) exec() {
task := &Task{Type: msg.Type, Payload: msg.Payload} task := &Task{Type: msg.Type, Payload: msg.Payload}
p.sema <- struct{}{} // acquire token p.sema <- struct{}{} // acquire token
go func(task *Task) { go func(task *Task) {
defer func() { // NOTE: This deferred anonymous function needs to take taskMessage as a value because
if err := p.rdb.lrem(inProgress, msg); err != nil { // the message can be mutated by the time this function is called.
defer func(msg taskMessage) {
if err := p.rdb.lrem(inProgress, &msg); err != nil {
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err)
} }
<-p.sema // release token <-p.sema // release token
}() }(*msg)
err := perform(p.handler, task) err := perform(p.handler, task)
if err != nil { if err != nil {
retryTask(p.rdb, msg, err) retryTask(p.rdb, msg, err)