2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00
asynq/processor.go

212 lines
5.7 KiB
Go
Raw Normal View History

2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
2019-11-19 22:48:54 +08:00
package asynq
import (
"fmt"
"log"
"sync"
2019-11-19 22:48:54 +08:00
"time"
2019-12-04 13:01:26 +08:00
2019-12-22 23:15:45 +08:00
"github.com/hibiken/asynq/internal/base"
2019-12-04 13:01:26 +08:00
"github.com/hibiken/asynq/internal/rdb"
2019-11-19 22:48:54 +08:00
)
2019-11-21 12:08:03 +08:00
type processor struct {
2019-12-04 13:01:26 +08:00
rdb *rdb.RDB
2019-11-19 22:48:54 +08:00
handler Handler
2019-11-19 22:48:54 +08:00
retryDelayFunc retryDelayFunc
2019-11-30 04:48:54 +08:00
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
// in case of a program shutdown or additon of a new queue.
dequeueTimeout time.Duration
2019-11-19 22:48:54 +08:00
// sema is a counting semaphore to ensure the number of active workers
2019-11-30 04:48:54 +08:00
// does not exceed the limit.
2019-11-19 22:48:54 +08:00
sema chan struct{}
2019-11-21 12:08:03 +08:00
// channel to communicate back to the long running "processor" goroutine.
2019-12-18 12:34:56 +08:00
// once is used to send value to the channel only once.
2019-11-19 22:48:54 +08:00
done chan struct{}
2019-12-18 12:34:56 +08:00
once sync.Once
2019-12-19 10:57:48 +08:00
// abort channel is closed when the shutdown of the "processor" goroutine starts.
abort chan struct{}
// quit channel communicates to the in-flight worker goroutines to stop.
quit chan struct{}
2019-11-19 22:48:54 +08:00
}
type retryDelayFunc func(n int, err error, task *Task) time.Duration
func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor {
2019-11-21 12:08:03 +08:00
return &processor{
2019-12-04 13:01:26 +08:00
rdb: r,
retryDelayFunc: fn,
dequeueTimeout: 2 * time.Second,
sema: make(chan struct{}, n),
2019-11-30 04:48:54 +08:00
done: make(chan struct{}),
2019-12-19 10:57:48 +08:00
abort: make(chan struct{}),
quit: make(chan struct{}),
handler: HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }),
2019-11-19 22:48:54 +08:00
}
}
// Note: stops only the "processor" goroutine, does not stop workers.
// It's safe to call this method multiple times.
func (p *processor) stop() {
2019-12-18 12:34:56 +08:00
p.once.Do(func() {
log.Println("[INFO] Processor shutting down...")
// Unblock if processor is waiting for sema token.
2019-12-19 10:57:48 +08:00
close(p.abort)
2019-12-18 12:34:56 +08:00
// Signal the processor goroutine to stop processing tasks
// from the queue.
p.done <- struct{}{}
})
}
2019-11-30 04:48:54 +08:00
// NOTE: once terminated, processor cannot be re-started.
2019-11-21 12:08:03 +08:00
func (p *processor) terminate() {
p.stop()
2019-12-30 23:10:13 +08:00
// IDEA: Allow user to customize this timeout value.
const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) })
log.Println("[INFO] Waiting for all workers to finish...")
2019-11-28 11:36:56 +08:00
// block until all workers have released the token
2019-11-21 12:08:03 +08:00
for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{}
}
log.Println("[INFO] All workers have finished.")
p.restore() // move any unfinished tasks back to the queue.
2019-11-19 22:48:54 +08:00
}
2019-11-21 12:08:03 +08:00
func (p *processor) start() {
// NOTE: The call to "restore" needs to complete before starting
// the processor goroutine.
p.restore()
2019-11-19 22:48:54 +08:00
go func() {
for {
select {
2019-11-21 12:08:03 +08:00
case <-p.done:
log.Println("[INFO] Processor done.")
return
2019-11-19 22:48:54 +08:00
default:
2019-11-21 12:08:03 +08:00
p.exec()
2019-11-19 22:48:54 +08:00
}
}
}()
}
2019-11-22 13:45:27 +08:00
// exec pulls a task out of the queue and starts a worker goroutine to
// process the task.
2019-11-21 12:08:03 +08:00
func (p *processor) exec() {
2019-12-04 22:25:58 +08:00
msg, err := p.rdb.Dequeue(p.dequeueTimeout)
2019-12-04 13:01:26 +08:00
if err == rdb.ErrDequeueTimeout {
2019-11-28 11:36:56 +08:00
// timed out, this is a normal behavior.
return
}
2019-11-19 22:48:54 +08:00
if err != nil {
2019-11-28 11:36:56 +08:00
log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err)
return
2019-11-19 22:48:54 +08:00
}
select {
2019-12-19 10:57:48 +08:00
case <-p.abort:
// shutdown is starting, return immediately after requeuing the message.
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
go func() {
defer func() { <-p.sema /* release token */ }()
resCh := make(chan error, 1)
task := &Task{Type: msg.Type, Payload: msg.Payload}
go func() {
resCh <- perform(p.handler, task)
}()
select {
case <-p.quit:
// time is up, quit this worker goroutine.
log.Printf("[WARN] Terminating in-progress task %+v\n", msg)
return
case resErr := <-resCh:
// Note: One of three things should happen.
// 1) Done -> Removes the message from InProgress
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if msg.Retried >= msg.Retry {
p.kill(msg, resErr)
2019-12-26 12:04:29 +08:00
} else {
p.retry(msg, resErr)
}
return
}
p.markAsDone(msg)
2019-11-22 13:45:27 +08:00
}
}()
}
2019-11-19 22:48:54 +08:00
}
// restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks.
func (p *processor) restore() {
n, err := p.rdb.RestoreUnfinished()
if err != nil {
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
}
if n > 0 {
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n)
}
}
2019-12-22 23:15:45 +08:00
func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg)
if err != nil {
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err)
}
}
2019-12-22 23:15:45 +08:00
func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg)
if err != nil {
log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err)
}
}
func (p *processor) retry(msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, &Task{Type: msg.Type, Payload: msg.Payload})
retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err)
}
}
func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
err := p.rdb.Kill(msg, e.Error())
if err != nil {
log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err)
}
}
2019-11-28 06:03:04 +08:00
// perform calls the handler with the given task.
// If the call returns without panic, it simply returns the value,
// otherwise, it recovers from panic and returns an error.
func perform(h Handler, task *Task) (err error) {
2019-11-28 06:03:04 +08:00
defer func() {
if x := recover(); x != nil {
err = fmt.Errorf("panic: %v", x)
}
}()
return h.ProcessTask(task)
2019-11-28 06:03:04 +08:00
}