mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 08:12:19 +08:00
Merge pull request #16 from hibiken/feature/signals
Add timeout to shutdown when TERM signal is received
This commit is contained in:
commit
12fc336889
3
asynq.go
3
asynq.go
@ -4,8 +4,9 @@ import "github.com/go-redis/redis/v7"
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
TODOs:
|
TODOs:
|
||||||
|
- [P0] Proper OS Signal handling - TTIN to stop the processor
|
||||||
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
- [P0] asynqmon kill <taskID>, asynqmon killall <qname>
|
||||||
- [P0] Assigning int or any number type to Payload will be converted to float64 in handler
|
- [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler
|
||||||
- [P0] Redis Memory Usage, Connection info in stats
|
- [P0] Redis Memory Usage, Connection info in stats
|
||||||
- [P0] Processed, Failed count for today
|
- [P0] Processed, Failed count for today
|
||||||
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
|
||||||
|
@ -45,6 +45,14 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessa
|
|||||||
return out
|
return out
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry {
|
||||||
|
out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it
|
||||||
|
sort.Slice(out, func(i, j int) bool {
|
||||||
|
return out[i].msg.ID.String() < out[j].msg.ID.String()
|
||||||
|
})
|
||||||
|
return out
|
||||||
|
})
|
||||||
|
|
||||||
func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage {
|
func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage {
|
||||||
return &TaskMessage{
|
return &TaskMessage{
|
||||||
ID: xid.New(),
|
ID: xid.New(),
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
@ -119,44 +118,81 @@ func (r *RDB) Done(msg *TaskMessage) error {
|
|||||||
|
|
||||||
// Schedule adds the task to the backlog queue to be processed in the future.
|
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||||
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||||
return r.schedule(scheduledQ, processAt, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// RetryLater adds the task to the backlog queue to be retried in the future.
|
|
||||||
func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error {
|
|
||||||
return r.schedule(retryQ, processAt, msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// schedule adds the task to the zset to be processd at the specified time.
|
|
||||||
func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error {
|
|
||||||
bytes, err := json.Marshal(msg)
|
bytes, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
}
|
}
|
||||||
score := float64(processAt.Unix())
|
score := float64(processAt.Unix())
|
||||||
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
|
err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err)
|
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kill sends the task to "dead" set.
|
// Retry moves the task from in-progress to retry queue, incrementing retry count
|
||||||
// It also trims the set by timestamp and set size.
|
// and assigning error message to the task message.
|
||||||
func (r *RDB) Kill(msg *TaskMessage) error {
|
func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error {
|
||||||
const maxDeadTask = 10
|
bytesToRemove, err := json.Marshal(msg)
|
||||||
const deadExpirationInDays = 90
|
|
||||||
bytes, err := json.Marshal(msg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
}
|
}
|
||||||
|
modified := *msg
|
||||||
|
modified.Retried++
|
||||||
|
modified.ErrorMsg = errMsg
|
||||||
|
bytesToAdd, err := json.Marshal(&modified)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
||||||
|
}
|
||||||
|
// KEYS[1] -> asynq:in_progress
|
||||||
|
// KEYS[2] -> asynq:retry
|
||||||
|
// ARGV[1] -> TaskMessage value to remove from InProgress queue
|
||||||
|
// ARGV[2] -> TaskMessage value to add to Retry queue
|
||||||
|
// ARGV[3] -> retry_at UNIX timestamp
|
||||||
|
script := redis.NewScript(`
|
||||||
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
_, err = script.Run(r.client, []string{inProgressQ, retryQ},
|
||||||
|
string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kill sends the task to "dead" queue from in-progress queue, assigning
|
||||||
|
// the error message to the task.
|
||||||
|
// It also trims the set by timestamp and set size.
|
||||||
|
func (r *RDB) Kill(msg *TaskMessage, errMsg string) error {
|
||||||
|
const maxDeadTask = 10
|
||||||
|
const deadExpirationInDays = 90
|
||||||
|
bytesToRemove, err := json.Marshal(msg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
|
}
|
||||||
|
modified := *msg
|
||||||
|
modified.ErrorMsg = errMsg
|
||||||
|
bytesToAdd, err := json.Marshal(&modified)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
||||||
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
pipe := r.client.Pipeline()
|
|
||||||
pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
|
||||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||||
pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit)))
|
// KEYS[1] -> asynq:in_progress
|
||||||
pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100
|
// KEYS[2] -> asynq:dead
|
||||||
_, err = pipe.Exec()
|
// ARGV[1] -> TaskMessage value to remove from InProgress queue
|
||||||
|
// ARGV[2] -> TaskMessage value to add to Dead queue
|
||||||
|
// ARGV[3] -> died_at UNIX timestamp
|
||||||
|
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
||||||
|
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||||
|
script := redis.NewScript(`
|
||||||
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
|
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
||||||
|
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
_, err = script.Run(r.client, []string{inProgressQ, deadQ},
|
||||||
|
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,35 +115,80 @@ func TestDone(t *testing.T) {
|
|||||||
func TestKill(t *testing.T) {
|
func TestKill(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := newTaskMessage("send_email", nil)
|
t1 := newTaskMessage("send_email", nil)
|
||||||
|
t2 := newTaskMessage("reindex", nil)
|
||||||
|
t3 := newTaskMessage("generate_csv", nil)
|
||||||
|
errMsg := "SMTP server not responding"
|
||||||
|
t1AfterKill := &TaskMessage{
|
||||||
|
ID: t1.ID,
|
||||||
|
Type: t1.Type,
|
||||||
|
Payload: t1.Payload,
|
||||||
|
Queue: t1.Queue,
|
||||||
|
Retry: t1.Retry,
|
||||||
|
Retried: t1.Retried,
|
||||||
|
ErrorMsg: errMsg,
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
// TODO(hibiken): add test cases for trimming
|
// TODO(hibiken): add test cases for trimming
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dead []sortedSetEntry // inital state of dead queue
|
inProgress []*TaskMessage
|
||||||
|
dead []sortedSetEntry
|
||||||
target *TaskMessage // task to kill
|
target *TaskMessage // task to kill
|
||||||
wantDead []*TaskMessage // final state of dead queue
|
wantInProgress []*TaskMessage
|
||||||
|
wantDead []sortedSetEntry
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
inProgress: []*TaskMessage{t1, t2},
|
||||||
|
dead: []sortedSetEntry{
|
||||||
|
{t3, now.Add(-time.Hour).Unix()},
|
||||||
|
},
|
||||||
|
target: t1,
|
||||||
|
wantInProgress: []*TaskMessage{t2},
|
||||||
|
wantDead: []sortedSetEntry{
|
||||||
|
{t1AfterKill, now.Unix()},
|
||||||
|
{t3, now.Add(-time.Hour).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
inProgress: []*TaskMessage{t1, t2, t3},
|
||||||
dead: []sortedSetEntry{},
|
dead: []sortedSetEntry{},
|
||||||
target: t1,
|
target: t1,
|
||||||
wantDead: []*TaskMessage{t1},
|
wantInProgress: []*TaskMessage{t2, t3},
|
||||||
|
wantDead: []sortedSetEntry{
|
||||||
|
{t1AfterKill, now.Unix()},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
flushDB(t, r) // clean up db before each test case
|
flushDB(t, r) // clean up db before each test case
|
||||||
|
seedInProgressQueue(t, r, tc.inProgress)
|
||||||
seedDeadQueue(t, r, tc.dead)
|
seedDeadQueue(t, r, tc.dead)
|
||||||
|
|
||||||
err := r.Kill(tc.target)
|
err := r.Kill(tc.target, errMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
data := r.client.ZRange(deadQ, 0, -1).Val()
|
gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||||
gotDead := mustUnmarshalSlice(t, data)
|
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||||
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
var gotDead []sortedSetEntry
|
||||||
|
data := r.client.ZRangeWithScores(deadQ, 0, -1).Val()
|
||||||
|
for _, z := range data {
|
||||||
|
gotDead = append(gotDead, sortedSetEntry{
|
||||||
|
msg: mustUnmarshal(t, z.Member.(string)),
|
||||||
|
score: int64(z.Score),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
|
||||||
|
if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff)
|
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff)
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -312,36 +357,77 @@ func TestSchedule(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetryLater(t *testing.T) {
|
func TestRetry(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})
|
||||||
|
t2 := newTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"})
|
||||||
|
t3 := newTaskMessage("reindex", nil)
|
||||||
|
t1.Retried = 10
|
||||||
|
errMsg := "SMTP server is not responding"
|
||||||
|
t1AfterRetry := &TaskMessage{
|
||||||
|
ID: t1.ID,
|
||||||
|
Type: t1.Type,
|
||||||
|
Payload: t1.Payload,
|
||||||
|
Queue: t1.Queue,
|
||||||
|
Retry: t1.Retry,
|
||||||
|
Retried: t1.Retried + 1,
|
||||||
|
ErrorMsg: errMsg,
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
inProgress []*TaskMessage
|
||||||
|
retry []sortedSetEntry
|
||||||
msg *TaskMessage
|
msg *TaskMessage
|
||||||
processAt time.Time
|
processAt time.Time
|
||||||
|
errMsg string
|
||||||
|
wantInProgress []*TaskMessage
|
||||||
|
wantRetry []sortedSetEntry
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
newTaskMessage("send_email", map[string]interface{}{"subject": "hello"}),
|
inProgress: []*TaskMessage{t1, t2},
|
||||||
time.Now().Add(15 * time.Minute),
|
retry: []sortedSetEntry{
|
||||||
|
{t3, now.Add(time.Minute).Unix()},
|
||||||
|
},
|
||||||
|
msg: t1,
|
||||||
|
processAt: now.Add(5 * time.Minute),
|
||||||
|
errMsg: errMsg,
|
||||||
|
wantInProgress: []*TaskMessage{t2},
|
||||||
|
wantRetry: []sortedSetEntry{
|
||||||
|
{t1AfterRetry, now.Add(5 * time.Minute).Unix()},
|
||||||
|
{t3, now.Add(time.Minute).Unix()},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
flushDB(t, r) // clean up db before each test case
|
flushDB(t, r)
|
||||||
|
seedInProgressQueue(t, r, tc.inProgress)
|
||||||
|
seedRetryQueue(t, r, tc.retry)
|
||||||
|
|
||||||
desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt)
|
err := r.Retry(tc.msg, tc.processAt, tc.errMsg)
|
||||||
err := r.RetryLater(tc.msg, tc.processAt)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s = %v, want nil", desc, err)
|
t.Errorf("(*RDB).Retry = %v, want nil", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
res := r.client.ZRangeWithScores(retryQ, 0, -1).Val()
|
gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||||
if len(res) != 1 {
|
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ)
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||||
continue
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff)
|
||||||
}
|
}
|
||||||
if res[0].Score != float64(tc.processAt.Unix()) {
|
|
||||||
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val()
|
||||||
continue
|
var gotRetry []sortedSetEntry
|
||||||
|
for _, z := range gotRetryRaw {
|
||||||
|
gotRetry = append(gotRetry, sortedSetEntry{
|
||||||
|
msg: mustUnmarshal(t, z.Member.(string)),
|
||||||
|
score: int64(z.Score),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
|
||||||
|
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
@ -74,7 +75,10 @@ func TestPoller(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// initialize retry queue
|
// initialize retry queue
|
||||||
for _, st := range tc.initRetry {
|
for _, st := range tc.initRetry {
|
||||||
err := rdbClient.RetryLater(st.msg, st.processAt)
|
err := r.ZAdd(retryQ, &redis.Z{
|
||||||
|
Member: mustMarshal(t, st.msg),
|
||||||
|
Score: float64(st.processAt.Unix()),
|
||||||
|
}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
84
processor.go
84
processor.go
@ -3,6 +3,8 @@ package asynq
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
@ -24,6 +26,9 @@ type processor struct {
|
|||||||
|
|
||||||
// channel to communicate back to the long running "processor" goroutine.
|
// channel to communicate back to the long running "processor" goroutine.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
|
|
||||||
|
// quit channel communicates to the in-flight worker goroutines to stop.
|
||||||
|
quit chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
||||||
@ -33,6 +38,7 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
|||||||
dequeueTimeout: 5 * time.Second,
|
dequeueTimeout: 5 * time.Second,
|
||||||
sema: make(chan struct{}, numWorkers),
|
sema: make(chan struct{}, numWorkers),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -42,12 +48,16 @@ func (p *processor) terminate() {
|
|||||||
// Signal the processor goroutine to stop processing tasks from the queue.
|
// Signal the processor goroutine to stop processing tasks from the queue.
|
||||||
p.done <- struct{}{}
|
p.done <- struct{}{}
|
||||||
|
|
||||||
|
// TODO(hibiken): Allow user to customize this timeout value.
|
||||||
|
const timeout = 8 * time.Second
|
||||||
|
time.AfterFunc(timeout, func() { close(p.quit) })
|
||||||
log.Println("[INFO] Waiting for all workers to finish...")
|
log.Println("[INFO] Waiting for all workers to finish...")
|
||||||
// block until all workers have released the token
|
// block until all workers have released the token
|
||||||
for i := 0; i < cap(p.sema); i++ {
|
for i := 0; i < cap(p.sema); i++ {
|
||||||
p.sema <- struct{}{}
|
p.sema <- struct{}{}
|
||||||
}
|
}
|
||||||
log.Println("[INFO] All workers have finished.")
|
log.Println("[INFO] All workers have finished.")
|
||||||
|
p.restore() // move any unfinished tasks back to the queue.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) start() {
|
func (p *processor) start() {
|
||||||
@ -80,22 +90,37 @@ func (p *processor) exec() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
|
||||||
p.sema <- struct{}{} // acquire token
|
p.sema <- struct{}{} // acquire token
|
||||||
go func(task *Task) {
|
go func() {
|
||||||
// NOTE: This deferred anonymous function needs to take taskMessage as a value because
|
defer func() { <-p.sema /* release token */ }()
|
||||||
// the message can be mutated by the time this function is called.
|
|
||||||
defer func(msg rdb.TaskMessage) {
|
resCh := make(chan error, 1)
|
||||||
if err := p.rdb.Done(&msg); err != nil {
|
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||||
log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err)
|
go func() {
|
||||||
|
resCh <- perform(p.handler, task)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-p.quit:
|
||||||
|
// time is up, quit this worker goroutine.
|
||||||
|
return
|
||||||
|
case resErr := <-resCh:
|
||||||
|
// Note: One of three things should happen.
|
||||||
|
// 1) Done -> Removes the message from InProgress
|
||||||
|
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||||
|
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||||
|
if resErr != nil {
|
||||||
|
if msg.Retried >= msg.Retry {
|
||||||
|
p.kill(msg, resErr.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
<-p.sema // release token
|
p.retry(msg, resErr.Error())
|
||||||
}(*msg)
|
return
|
||||||
err := perform(p.handler, task)
|
|
||||||
if err != nil {
|
|
||||||
retryTask(p.rdb, msg, err)
|
|
||||||
}
|
}
|
||||||
}(task)
|
p.markAsDone(msg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore moves all tasks from "in-progress" back to queue
|
// restore moves all tasks from "in-progress" back to queue
|
||||||
@ -103,7 +128,30 @@ func (p *processor) exec() {
|
|||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
err := p.rdb.RestoreUnfinished()
|
err := p.rdb.RestoreUnfinished()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err)
|
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *processor) markAsDone(msg *rdb.TaskMessage) {
|
||||||
|
err := p.rdb.Done(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) {
|
||||||
|
retryAt := time.Now().Add(delaySeconds(msg.Retried))
|
||||||
|
err := p.rdb.Retry(msg, retryAt, errMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *processor) kill(msg *rdb.TaskMessage, errMsg string) {
|
||||||
|
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
|
||||||
|
err := p.rdb.Kill(msg, errMsg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,3 +166,11 @@ func perform(h Handler, task *Task) (err error) {
|
|||||||
}()
|
}()
|
||||||
return h.ProcessTask(task)
|
return h.ProcessTask(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delaySeconds returns a number seconds to delay before retrying.
|
||||||
|
// Formula taken from https://github.com/mperham/sidekiq.
|
||||||
|
func delaySeconds(count int) time.Duration {
|
||||||
|
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1))
|
||||||
|
return time.Duration(s) * time.Second
|
||||||
|
}
|
||||||
|
36
retry.go
36
retry.go
@ -1,36 +0,0 @@
|
|||||||
package asynq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
|
||||||
)
|
|
||||||
|
|
||||||
func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) {
|
|
||||||
msg.ErrorMsg = err.Error()
|
|
||||||
if msg.Retried >= msg.Retry {
|
|
||||||
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
|
|
||||||
if err := r.Kill(msg); err != nil {
|
|
||||||
log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
retryAt := time.Now().Add(delaySeconds((msg.Retried)))
|
|
||||||
log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now()))
|
|
||||||
msg.Retried++
|
|
||||||
if err := r.RetryLater(msg, retryAt); err != nil {
|
|
||||||
log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// delaySeconds returns a number seconds to delay before retrying.
|
|
||||||
// Formula taken from https://github.com/mperham/sidekiq.
|
|
||||||
func delaySeconds(count int) time.Duration {
|
|
||||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
||||||
s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1))
|
|
||||||
return time.Duration(s) * time.Second
|
|
||||||
}
|
|
@ -1,71 +0,0 @@
|
|||||||
package asynq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
|
||||||
"github.com/rs/xid"
|
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestRetry(t *testing.T) {
|
|
||||||
r := setup(t)
|
|
||||||
rdbClient := rdb.NewRDB(r)
|
|
||||||
errMsg := "email server not responding"
|
|
||||||
// t1 is a task with max-retry count reached.
|
|
||||||
t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: xid.New()}
|
|
||||||
// t2 is t1 with updated error message.
|
|
||||||
t2 := *t1
|
|
||||||
t2.ErrorMsg = errMsg
|
|
||||||
// t3 is a task which hasn't reached max-retry count.
|
|
||||||
t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: xid.New()}
|
|
||||||
// t4 is t3 after retry.
|
|
||||||
t4 := *t3
|
|
||||||
t4.Retried++
|
|
||||||
t4.ErrorMsg = errMsg
|
|
||||||
|
|
||||||
tests := []struct {
|
|
||||||
desc string // test case description
|
|
||||||
msg *rdb.TaskMessage // task to retry
|
|
||||||
err error // error that caused retry
|
|
||||||
wantDead []*rdb.TaskMessage // state "dead" queue should be in
|
|
||||||
wantRetry []*rdb.TaskMessage // state "retry" queue should be in
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
desc: "With retry exhausted task",
|
|
||||||
msg: t1,
|
|
||||||
err: fmt.Errorf(errMsg),
|
|
||||||
wantDead: []*rdb.TaskMessage{&t2},
|
|
||||||
wantRetry: []*rdb.TaskMessage{},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
desc: "With retry-able task",
|
|
||||||
msg: t3,
|
|
||||||
err: fmt.Errorf(errMsg),
|
|
||||||
wantDead: []*rdb.TaskMessage{},
|
|
||||||
wantRetry: []*rdb.TaskMessage{&t4},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range tests {
|
|
||||||
// clean up db before each test case.
|
|
||||||
if err := r.FlushDB().Err(); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
retryTask(rdbClient, tc.msg, tc.err)
|
|
||||||
|
|
||||||
deadQueue := r.ZRange(deadQ, 0, -1).Val()
|
|
||||||
gotDead := mustUnmarshalSlice(t, deadQueue)
|
|
||||||
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
|
|
||||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff)
|
|
||||||
}
|
|
||||||
|
|
||||||
retryQueue := r.ZRange(retryQ, 0, -1).Val()
|
|
||||||
gotRetry := mustUnmarshalSlice(t, retryQueue)
|
|
||||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
|
|
||||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue
Block a user