mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
Track in-progress tasks with redis SET
This commit is contained in:
15
processor.go
15
processor.go
@@ -56,12 +56,14 @@ func (p *processor) start() {
|
||||
}()
|
||||
}
|
||||
|
||||
// exec pulls a task out of the queue and starts a worker goroutine to
|
||||
// process the task.
|
||||
func (p *processor) exec() {
|
||||
// NOTE: BLPOP needs to timeout to avoid blocking forever
|
||||
// in case of a program shutdown or additon of a new queue.
|
||||
const timeout = 5 * time.Second
|
||||
// pull a task out of the queue and process it
|
||||
// TODO(hibiken): sort the list of queues in order of priority
|
||||
// NOTE: BLPOP needs to timeout in case a new queue is added.
|
||||
msg, err := p.rdb.bpop(timeout, p.rdb.listQueues()...)
|
||||
msg, err := p.rdb.dequeue(timeout, p.rdb.listQueues()...)
|
||||
if err != nil {
|
||||
switch err {
|
||||
case errQueuePopTimeout:
|
||||
@@ -79,7 +81,12 @@ func (p *processor) exec() {
|
||||
t := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||
p.sema <- struct{}{} // acquire token
|
||||
go func(task *Task) {
|
||||
defer func() { <-p.sema }() // release token
|
||||
defer func() {
|
||||
if err := p.rdb.srem(inProgress, msg); err != nil {
|
||||
log.Printf("[SERVER ERROR] SREM failed: %v\n", err)
|
||||
}
|
||||
<-p.sema // release token
|
||||
}()
|
||||
err := p.handler(task)
|
||||
if err != nil {
|
||||
retryTask(p.rdb, msg, err)
|
||||
|
Reference in New Issue
Block a user