From 1b1662bb12f98e3dcdf17d555ce14a98d07b8e79 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 16:15:07 -0800 Subject: [PATCH 1/7] Add Retry method to *RDB (*RDB).Retry method takes a TaskMessage and will atomically moves the message from in_progress queue to retry queue. Additionally it increments the Retried counter and assigns the error message to the message. --- asynq.go | 2 + internal/rdb/helpers_test.go | 8 ++++ internal/rdb/rdb.go | 24 ++++++++++++ internal/rdb/rdb_test.go | 75 ++++++++++++++++++++++++++++++++++++ 4 files changed, 109 insertions(+) diff --git a/asynq.go b/asynq.go index aa5ec84..0510ca7 100644 --- a/asynq.go +++ b/asynq.go @@ -4,6 +4,8 @@ import "github.com/go-redis/redis/v7" /* TODOs: +- [P0] Proper OS Signal handling +- [P0] Wait for a certain amount of time for wokers to finish on TERM signal - [P0] asynqmon kill , asynqmon killall - [P0] Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Redis Memory Usage, Connection info in stats diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go index 5f39bc7..55ac3ef 100644 --- a/internal/rdb/helpers_test.go +++ b/internal/rdb/helpers_test.go @@ -45,6 +45,14 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessa return out }) +var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { + out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].msg.ID.String() < out[j].msg.ID.String() + }) + return out +}) + func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { return &TaskMessage{ ID: xid.New(), diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d455ad2..fde76b9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -127,6 +127,30 @@ func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { return r.schedule(retryQ, processAt, msg) } +// Retry moves the task from in-progress to retry queue, incrementing retry count +// and assigning error message to the task message. +func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:retry + // ARGV[1] -> TaskMessage value + // ARGV[2] -> error message + // ARGV[3] -> retry_at UNIX timestamp + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + local msg = cjson.decode(ARGV[1]) + msg["Retried"] = msg["Retried"] + 1 + msg["ErrorMsg"] = ARGV[2] + redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result() + return err +} + // schedule adds the task to the zset to be processd at the specified time. func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { bytes, err := json.Marshal(msg) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 88791c2..3ee37f3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -345,3 +345,78 @@ func TestRetryLater(t *testing.T) { } } } + +func TestRetry(t *testing.T) { + r := setup(t) + t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) + t2 := newTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"}) + t3 := newTaskMessage("reindex", nil) + t1.Retried = 10 + errMsg := "SMTP server is not responding" + t1AfterRetry := &TaskMessage{ + ID: t1.ID, + Type: t1.Type, + Payload: t1.Payload, + Queue: t1.Queue, + Retry: t1.Retry, + Retried: t1.Retried + 1, + ErrorMsg: errMsg, + } + now := time.Now() + + tests := []struct { + inProgress []*TaskMessage + retry []sortedSetEntry + msg *TaskMessage + processAt time.Time + errMsg string + wantInProgress []*TaskMessage + wantRetry []sortedSetEntry + }{ + { + inProgress: []*TaskMessage{t1, t2}, + retry: []sortedSetEntry{ + {t3, now.Add(time.Minute).Unix()}, + }, + msg: t1, + processAt: now.Add(5 * time.Minute), + errMsg: errMsg, + wantInProgress: []*TaskMessage{t2}, + wantRetry: []sortedSetEntry{ + {t1AfterRetry, now.Add(5 * time.Minute).Unix()}, + {t3, now.Add(time.Minute).Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedInProgressQueue(t, r, tc.inProgress) + seedRetryQueue(t, r, tc.retry) + + err := r.Retry(tc.msg, tc.processAt, tc.errMsg) + if err != nil { + t.Errorf("(*RDB).Retry = %v, want nil", err) + continue + } + + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + } + + gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val() + var gotRetry []sortedSetEntry + for _, z := range gotRetryRaw { + gotRetry = append(gotRetry, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + } + } +} From d84e8c0ff26f5629b7f958296fc6e497d43366e9 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 17:16:13 -0800 Subject: [PATCH 2/7] Modify (*RDB).Kill method to atomically move task from in_progress to dead queue --- internal/rdb/rdb.go | 29 ++++++++++++----- internal/rdb/rdb_test.go | 69 +++++++++++++++++++++++++++++++++------- 2 files changed, 78 insertions(+), 20 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fde76b9..9ee829d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "time" "github.com/go-redis/redis/v7" @@ -165,9 +164,10 @@ func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error return nil } -// Kill sends the task to "dead" set. +// Kill sends the task to "dead" queue from in-progress queue, assigning +// the error message to the task. // It also trims the set by timestamp and set size. -func (r *RDB) Kill(msg *TaskMessage) error { +func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { const maxDeadTask = 10 const deadExpirationInDays = 90 bytes, err := json.Marshal(msg) @@ -175,12 +175,25 @@ func (r *RDB) Kill(msg *TaskMessage) error { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } now := time.Now() - pipe := r.client.Pipeline() - pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit))) - pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100 - _, err = pipe.Exec() + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:dead + // ARGV[1] -> TaskMessage value + // ARGV[2] -> error message + // ARGV[3] -> died_at UNIX timestamp + // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) + // ARGV[5] -> max number of tasks in dead queue (e.g., 100) + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + local msg = cjson.decode(ARGV[1]) + msg["ErrorMsg"] = ARGV[2] + redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, deadQ}, + string(bytes), errMsg, now.Unix(), limit, maxDeadTask).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3ee37f3..7717b7a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -115,35 +115,80 @@ func TestDone(t *testing.T) { func TestKill(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", nil) + t2 := newTaskMessage("reindex", nil) + t3 := newTaskMessage("generate_csv", nil) + errMsg := "SMTP server not responding" + t1AfterKill := &TaskMessage{ + ID: t1.ID, + Type: t1.Type, + Payload: t1.Payload, + Queue: t1.Queue, + Retry: t1.Retry, + Retried: t1.Retried, + ErrorMsg: errMsg, + } + now := time.Now() // TODO(hibiken): add test cases for trimming tests := []struct { - dead []sortedSetEntry // inital state of dead queue - target *TaskMessage // task to kill - wantDead []*TaskMessage // final state of dead queue + inProgress []*TaskMessage + dead []sortedSetEntry + target *TaskMessage // task to kill + wantInProgress []*TaskMessage + wantDead []sortedSetEntry }{ { - dead: []sortedSetEntry{}, - target: t1, - wantDead: []*TaskMessage{t1}, + inProgress: []*TaskMessage{t1, t2}, + dead: []sortedSetEntry{ + {t3, now.Add(-time.Hour).Unix()}, + }, + target: t1, + wantInProgress: []*TaskMessage{t2}, + wantDead: []sortedSetEntry{ + {t1AfterKill, now.Unix()}, + {t3, now.Add(-time.Hour).Unix()}, + }, + }, + { + inProgress: []*TaskMessage{t1, t2, t3}, + dead: []sortedSetEntry{}, + target: t1, + wantInProgress: []*TaskMessage{t2, t3}, + wantDead: []sortedSetEntry{ + {t1AfterKill, now.Unix()}, + }, }, } for _, tc := range tests { flushDB(t, r) // clean up db before each test case + seedInProgressQueue(t, r, tc.inProgress) seedDeadQueue(t, r, tc.dead) - err := r.Kill(tc.target) + err := r.Kill(tc.target, errMsg) if err != nil { - t.Error(err) + t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err) continue } - data := r.client.ZRange(deadQ, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, data) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + } + + var gotDead []sortedSetEntry + data := r.client.ZRangeWithScores(deadQ, 0, -1).Val() + for _, z := range data { + gotDead = append(gotDead, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff) - continue } } } From 442b33a6d2c6db10726cd85681453bfdbd09d731 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 17:18:43 -0800 Subject: [PATCH 3/7] Remove (*RDB).RetryLater in favor of Retry method --- internal/rdb/rdb.go | 30 ++++++++++-------------------- internal/rdb/rdb_test.go | 34 ---------------------------------- 2 files changed, 10 insertions(+), 54 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 9ee829d..fc3fbd7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -118,12 +118,16 @@ func (r *RDB) Done(msg *TaskMessage) error { // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { - return r.schedule(scheduledQ, processAt, msg) -} - -// RetryLater adds the task to the backlog queue to be retried in the future. -func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { - return r.schedule(retryQ, processAt, msg) + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + score := float64(processAt.Unix()) + err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err() + if err != nil { + return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err) + } + return nil } // Retry moves the task from in-progress to retry queue, incrementing retry count @@ -150,20 +154,6 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error return err } -// schedule adds the task to the zset to be processd at the specified time. -func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { - bytes, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", msg, err) - } - score := float64(processAt.Unix()) - err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() - if err != nil { - return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err) - } - return nil -} - // Kill sends the task to "dead" queue from in-progress queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7717b7a..b6f7089 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -357,40 +357,6 @@ func TestSchedule(t *testing.T) { } } -func TestRetryLater(t *testing.T) { - r := setup(t) - tests := []struct { - msg *TaskMessage - processAt time.Time - }{ - { - newTaskMessage("send_email", map[string]interface{}{"subject": "hello"}), - time.Now().Add(15 * time.Minute), - }, - } - - for _, tc := range tests { - flushDB(t, r) // clean up db before each test case - - desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) - err := r.RetryLater(tc.msg, tc.processAt) - if err != nil { - t.Errorf("%s = %v, want nil", desc, err) - continue - } - - res := r.client.ZRangeWithScores(retryQ, 0, -1).Val() - if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ) - continue - } - if res[0].Score != float64(tc.processAt.Unix()) { - t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) - continue - } - } -} - func TestRetry(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) From e5686894d3da77e92a8a293a9ae51b7b9ed0977a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 20:05:56 -0800 Subject: [PATCH 4/7] Fix: Do not use lua cjson library to encode task to json Go and Lua json libraries encodes json differently (e.g. order of key/value) and caused a bug when removing tasks that was previously encoded by Lua json library and redis was receiving a string generated by Go json library. --- internal/rdb/rdb.go | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fc3fbd7..859e0b7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -133,24 +133,29 @@ func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { // Retry moves the task from in-progress to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { - bytes, err := json.Marshal(msg) + bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + modified := *msg + modified.Retried++ + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry - // ARGV[1] -> TaskMessage value - // ARGV[2] -> error message + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Retry queue // ARGV[3] -> retry_at UNIX timestamp script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) - local msg = cjson.decode(ARGV[1]) - msg["Retried"] = msg["Retried"] + 1 - msg["ErrorMsg"] = ARGV[2] - redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result() + _, err = script.Run(r.client, []string{inProgressQ, retryQ}, + string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result() return err } @@ -160,30 +165,34 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { const maxDeadTask = 10 const deadExpirationInDays = 90 - bytes, err := json.Marshal(msg) + bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + modified := *msg + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:dead - // ARGV[1] -> TaskMessage value - // ARGV[2] -> error message + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in dead queue (e.g., 100) script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) - local msg = cjson.decode(ARGV[1]) - msg["ErrorMsg"] = ARGV[2] - redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) return redis.status_reply("OK") `) _, err = script.Run(r.client, []string{inProgressQ, deadQ}, - string(bytes), errMsg, now.Unix(), limit, maxDeadTask).Result() + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result() return err } From 989b2b6d559df30b1f526b6f6eb73cc78e45fb63 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 15 Dec 2019 21:00:09 -0800 Subject: [PATCH 5/7] Add timeout to worker goroutines when TERM signal is received Wait for a certain amount of time to allow for worker goroutines to finish. If the goroutines don't finish with the timeout duration, processor will quit the goroutines and restore any unfinished tasks from the in_progress queue back to the default queue. --- poller_test.go | 6 +++- processor.go | 84 +++++++++++++++++++++++++++++++++++++++++--------- retry.go | 36 ---------------------- retry_test.go | 71 ------------------------------------------ 4 files changed, 75 insertions(+), 122 deletions(-) delete mode 100644 retry.go delete mode 100644 retry_test.go diff --git a/poller_test.go b/poller_test.go index 56687d7..f05e693 100644 --- a/poller_test.go +++ b/poller_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/hibiken/asynq/internal/rdb" ) @@ -74,7 +75,10 @@ func TestPoller(t *testing.T) { } // initialize retry queue for _, st := range tc.initRetry { - err := rdbClient.RetryLater(st.msg, st.processAt) + err := r.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, st.msg), + Score: float64(st.processAt.Unix()), + }).Err() if err != nil { t.Fatal(err) } diff --git a/processor.go b/processor.go index 2530163..6a9436e 100644 --- a/processor.go +++ b/processor.go @@ -3,6 +3,8 @@ package asynq import ( "fmt" "log" + "math" + "math/rand" "time" "github.com/hibiken/asynq/internal/rdb" @@ -24,6 +26,9 @@ type processor struct { // channel to communicate back to the long running "processor" goroutine. done chan struct{} + + // quit channel communicates to the in-flight worker goroutines to stop. + quit chan struct{} } func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { @@ -33,6 +38,7 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { dequeueTimeout: 5 * time.Second, sema: make(chan struct{}, numWorkers), done: make(chan struct{}), + quit: make(chan struct{}), } } @@ -42,12 +48,16 @@ func (p *processor) terminate() { // Signal the processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} + // TODO(hibiken): Allow user to customize this timeout value. + const timeout = 8 * time.Second + time.AfterFunc(timeout, func() { close(p.quit) }) log.Println("[INFO] Waiting for all workers to finish...") // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} } log.Println("[INFO] All workers have finished.") + p.restore() // move any unfinished tasks back to the queue. } func (p *processor) start() { @@ -80,22 +90,37 @@ func (p *processor) exec() { return } - task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token - go func(task *Task) { - // NOTE: This deferred anonymous function needs to take taskMessage as a value because - // the message can be mutated by the time this function is called. - defer func(msg rdb.TaskMessage) { - if err := p.rdb.Done(&msg); err != nil { - log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err) + go func() { + defer func() { <-p.sema /* release token */ }() + + resCh := make(chan error) + task := &Task{Type: msg.Type, Payload: msg.Payload} + go func() { + resCh <- perform(p.handler, task) + }() + + select { + case <-p.quit: + // time is up, quit this worker goroutine. + return + case resErr := <-resCh: + // Note: One of three things should happen. + // 1) Done -> Removes the message from InProgress + // 2) Retry -> Removes the message from InProgress & Adds the message to Retry + // 3) Kill -> Removes the message from InProgress & Adds the message to Dead + if resErr != nil { + if msg.Retried >= msg.Retry { + p.kill(msg, resErr.Error()) + return + } + p.retry(msg, resErr.Error()) + return } - <-p.sema // release token - }(*msg) - err := perform(p.handler, task) - if err != nil { - retryTask(p.rdb, msg, err) + p.markAsDone(msg) + return } - }(task) + }() } // restore moves all tasks from "in-progress" back to queue @@ -103,7 +128,30 @@ func (p *processor) exec() { func (p *processor) restore() { err := p.rdb.RestoreUnfinished() if err != nil { - log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err) + log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) + } +} + +func (p *processor) markAsDone(msg *rdb.TaskMessage) { + err := p.rdb.Done(msg) + if err != nil { + log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) + } +} + +func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) { + retryAt := time.Now().Add(delaySeconds(msg.Retried)) + err := p.rdb.Retry(msg, retryAt, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) + } +} + +func (p *processor) kill(msg *rdb.TaskMessage, errMsg string) { + log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) + err := p.rdb.Kill(msg, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) } } @@ -118,3 +166,11 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +} diff --git a/retry.go b/retry.go deleted file mode 100644 index 838d07a..0000000 --- a/retry.go +++ /dev/null @@ -1,36 +0,0 @@ -package asynq - -import ( - "log" - "math" - "math/rand" - "time" - - "github.com/hibiken/asynq/internal/rdb" -) - -func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) { - msg.ErrorMsg = err.Error() - if msg.Retried >= msg.Retry { - log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) - if err := r.Kill(msg); err != nil { - log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err) - } - return - } - retryAt := time.Now().Add(delaySeconds((msg.Retried))) - log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) - msg.Retried++ - if err := r.RetryLater(msg, retryAt); err != nil { - log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) - return - } -} - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/retry_test.go b/retry_test.go deleted file mode 100644 index 7a25eb3..0000000 --- a/retry_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package asynq - -import ( - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/rs/xid" - "github.com/hibiken/asynq/internal/rdb" -) - -func TestRetry(t *testing.T) { - r := setup(t) - rdbClient := rdb.NewRDB(r) - errMsg := "email server not responding" - // t1 is a task with max-retry count reached. - t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: xid.New()} - // t2 is t1 with updated error message. - t2 := *t1 - t2.ErrorMsg = errMsg - // t3 is a task which hasn't reached max-retry count. - t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: xid.New()} - // t4 is t3 after retry. - t4 := *t3 - t4.Retried++ - t4.ErrorMsg = errMsg - - tests := []struct { - desc string // test case description - msg *rdb.TaskMessage // task to retry - err error // error that caused retry - wantDead []*rdb.TaskMessage // state "dead" queue should be in - wantRetry []*rdb.TaskMessage // state "retry" queue should be in - }{ - { - desc: "With retry exhausted task", - msg: t1, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{&t2}, - wantRetry: []*rdb.TaskMessage{}, - }, - { - desc: "With retry-able task", - msg: t3, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{}, - wantRetry: []*rdb.TaskMessage{&t4}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } - - retryTask(rdbClient, tc.msg, tc.err) - - deadQueue := r.ZRange(deadQ, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, deadQueue) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - - retryQueue := r.ZRange(retryQ, 0, -1).Val() - gotRetry := mustUnmarshalSlice(t, retryQueue) - if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - } -} From eb3216d35440076d16b2d7ff9973fc7c5dcd802b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 16 Dec 2019 06:40:52 -0800 Subject: [PATCH 6/7] Fix: Use buffered channel of size 1 to avoid goroutine leaks --- processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processor.go b/processor.go index 6a9436e..8653401 100644 --- a/processor.go +++ b/processor.go @@ -94,7 +94,7 @@ func (p *processor) exec() { go func() { defer func() { <-p.sema /* release token */ }() - resCh := make(chan error) + resCh := make(chan error, 1) task := &Task{Type: msg.Type, Payload: msg.Payload} go func() { resCh <- perform(p.handler, task) From bcaccf11745a250724f03dfbefed4a243cd12ac7 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 16 Dec 2019 06:41:15 -0800 Subject: [PATCH 7/7] [ci skip] Update todos --- asynq.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/asynq.go b/asynq.go index 0510ca7..9f45ddd 100644 --- a/asynq.go +++ b/asynq.go @@ -4,10 +4,9 @@ import "github.com/go-redis/redis/v7" /* TODOs: -- [P0] Proper OS Signal handling -- [P0] Wait for a certain amount of time for wokers to finish on TERM signal +- [P0] Proper OS Signal handling - TTIN to stop the processor - [P0] asynqmon kill , asynqmon killall -- [P0] Assigning int or any number type to Payload will be converted to float64 in handler +- [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Redis Memory Usage, Connection info in stats - [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment