From 989b2b6d559df30b1f526b6f6eb73cc78e45fb63 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 21:00:09 -0800 Subject: [PATCH] Add timeout to worker goroutines when TERM signal is received Wait for a certain amount of time to allow for worker goroutines to finish. If the goroutines don't finish with the timeout duration, processor will quit the goroutines and restore any unfinished tasks from the in_progress queue back to the default queue. --- poller_test.go | 6 +++- processor.go | 84 +++++++++++++++++++++++++++++++++++++++++--------- retry.go | 36 ---------------------- retry_test.go | 71 ------------------------------------------ 4 files changed, 75 insertions(+), 122 deletions(-) delete mode 100644 retry.go delete mode 100644 retry_test.go diff --git a/poller_test.go b/poller_test.go index 56687d7..f05e693 100644 --- a/poller_test.go +++ b/poller_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/hibiken/asynq/internal/rdb" ) @@ -74,7 +75,10 @@ func TestPoller(t *testing.T) { } // initialize retry queue for _, st := range tc.initRetry { - err := rdbClient.RetryLater(st.msg, st.processAt) + err := r.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, st.msg), + Score: float64(st.processAt.Unix()), + }).Err() if err != nil { t.Fatal(err) } diff --git a/processor.go b/processor.go index 2530163..6a9436e 100644 --- a/processor.go +++ b/processor.go @@ -3,6 +3,8 @@ package asynq import ( "fmt" "log" + "math" + "math/rand" "time" "github.com/hibiken/asynq/internal/rdb" @@ -24,6 +26,9 @@ type processor struct { // channel to communicate back to the long running "processor" goroutine. done chan struct{} + + // quit channel communicates to the in-flight worker goroutines to stop. + quit chan struct{} } func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { @@ -33,6 +38,7 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { dequeueTimeout: 5 * time.Second, sema: make(chan struct{}, numWorkers), done: make(chan struct{}), + quit: make(chan struct{}), } } @@ -42,12 +48,16 @@ func (p *processor) terminate() { // Signal the processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} + // TODO(hibiken): Allow user to customize this timeout value. + const timeout = 8 * time.Second + time.AfterFunc(timeout, func() { close(p.quit) }) log.Println("[INFO] Waiting for all workers to finish...") // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} } log.Println("[INFO] All workers have finished.") + p.restore() // move any unfinished tasks back to the queue. } func (p *processor) start() { @@ -80,22 +90,37 @@ func (p *processor) exec() { return } - task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token - go func(task *Task) { - // NOTE: This deferred anonymous function needs to take taskMessage as a value because - // the message can be mutated by the time this function is called. - defer func(msg rdb.TaskMessage) { - if err := p.rdb.Done(&msg); err != nil { - log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err) + go func() { + defer func() { <-p.sema /* release token */ }() + + resCh := make(chan error) + task := &Task{Type: msg.Type, Payload: msg.Payload} + go func() { + resCh <- perform(p.handler, task) + }() + + select { + case <-p.quit: + // time is up, quit this worker goroutine. + return + case resErr := <-resCh: + // Note: One of three things should happen. + // 1) Done -> Removes the message from InProgress + // 2) Retry -> Removes the message from InProgress & Adds the message to Retry + // 3) Kill -> Removes the message from InProgress & Adds the message to Dead + if resErr != nil { + if msg.Retried >= msg.Retry { + p.kill(msg, resErr.Error()) + return + } + p.retry(msg, resErr.Error()) + return } - <-p.sema // release token - }(*msg) - err := perform(p.handler, task) - if err != nil { - retryTask(p.rdb, msg, err) + p.markAsDone(msg) + return } - }(task) + }() } // restore moves all tasks from "in-progress" back to queue @@ -103,7 +128,30 @@ func (p *processor) exec() { func (p *processor) restore() { err := p.rdb.RestoreUnfinished() if err != nil { - log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err) + log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) + } +} + +func (p *processor) markAsDone(msg *rdb.TaskMessage) { + err := p.rdb.Done(msg) + if err != nil { + log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) + } +} + +func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) { + retryAt := time.Now().Add(delaySeconds(msg.Retried)) + err := p.rdb.Retry(msg, retryAt, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) + } +} + +func (p *processor) kill(msg *rdb.TaskMessage, errMsg string) { + log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) + err := p.rdb.Kill(msg, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) } } @@ -118,3 +166,11 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +} diff --git a/retry.go b/retry.go deleted file mode 100644 index 838d07a..0000000 --- a/retry.go +++ /dev/null @@ -1,36 +0,0 @@ -package asynq - -import ( - "log" - "math" - "math/rand" - "time" - - "github.com/hibiken/asynq/internal/rdb" -) - -func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) { - msg.ErrorMsg = err.Error() - if msg.Retried >= msg.Retry { - log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) - if err := r.Kill(msg); err != nil { - log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err) - } - return - } - retryAt := time.Now().Add(delaySeconds((msg.Retried))) - log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) - msg.Retried++ - if err := r.RetryLater(msg, retryAt); err != nil { - log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) - return - } -} - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/retry_test.go b/retry_test.go deleted file mode 100644 index 7a25eb3..0000000 --- a/retry_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package asynq - -import ( - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/rs/xid" - "github.com/hibiken/asynq/internal/rdb" -) - -func TestRetry(t *testing.T) { - r := setup(t) - rdbClient := rdb.NewRDB(r) - errMsg := "email server not responding" - // t1 is a task with max-retry count reached. - t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: xid.New()} - // t2 is t1 with updated error message. - t2 := *t1 - t2.ErrorMsg = errMsg - // t3 is a task which hasn't reached max-retry count. - t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: xid.New()} - // t4 is t3 after retry. - t4 := *t3 - t4.Retried++ - t4.ErrorMsg = errMsg - - tests := []struct { - desc string // test case description - msg *rdb.TaskMessage // task to retry - err error // error that caused retry - wantDead []*rdb.TaskMessage // state "dead" queue should be in - wantRetry []*rdb.TaskMessage // state "retry" queue should be in - }{ - { - desc: "With retry exhausted task", - msg: t1, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{&t2}, - wantRetry: []*rdb.TaskMessage{}, - }, - { - desc: "With retry-able task", - msg: t3, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{}, - wantRetry: []*rdb.TaskMessage{&t4}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } - - retryTask(rdbClient, tc.msg, tc.err) - - deadQueue := r.ZRange(deadQ, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, deadQueue) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - - retryQueue := r.ZRange(retryQ, 0, -1).Val() - gotRetry := mustUnmarshalSlice(t, retryQueue) - if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - } -}