diff --git a/asynq.go b/asynq.go index aa5ec84..9f45ddd 100644 --- a/asynq.go +++ b/asynq.go @@ -4,8 +4,9 @@ import "github.com/go-redis/redis/v7" /* TODOs: +- [P0] Proper OS Signal handling - TTIN to stop the processor - [P0] asynqmon kill , asynqmon killall -- [P0] Assigning int or any number type to Payload will be converted to float64 in handler +- [P0] Better Payload API - Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Redis Memory Usage, Connection info in stats - [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go index 5f39bc7..55ac3ef 100644 --- a/internal/rdb/helpers_test.go +++ b/internal/rdb/helpers_test.go @@ -45,6 +45,14 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessa return out }) +var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { + out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].msg.ID.String() < out[j].msg.ID.String() + }) + return out +}) + func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { return &TaskMessage{ ID: xid.New(), diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d455ad2..859e0b7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "time" "github.com/go-redis/redis/v7" @@ -119,44 +118,81 @@ func (r *RDB) Done(msg *TaskMessage) error { // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { - return r.schedule(scheduledQ, processAt, msg) -} - -// RetryLater adds the task to the backlog queue to be retried in the future. -func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { - return r.schedule(retryQ, processAt, msg) -} - -// schedule adds the task to the zset to be processd at the specified time. -func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } score := float64(processAt.Unix()) - err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() + err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err() if err != nil { - return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err) + return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err) } return nil } -// Kill sends the task to "dead" set. -// It also trims the set by timestamp and set size. -func (r *RDB) Kill(msg *TaskMessage) error { - const maxDeadTask = 10 - const deadExpirationInDays = 90 - bytes, err := json.Marshal(msg) +// Retry moves the task from in-progress to retry queue, incrementing retry count +// and assigning error message to the task message. +func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { + bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + modified := *msg + modified.Retried++ + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:retry + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Retry queue + // ARGV[3] -> retry_at UNIX timestamp + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, retryQ}, + string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result() + return err +} + +// Kill sends the task to "dead" queue from in-progress queue, assigning +// the error message to the task. +// It also trims the set by timestamp and set size. +func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { + const maxDeadTask = 10 + const deadExpirationInDays = 90 + bytesToRemove, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + modified := *msg + modified.ErrorMsg = errMsg + bytesToAdd, err := json.Marshal(&modified) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + } now := time.Now() - pipe := r.client.Pipeline() - pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit))) - pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100 - _, err = pipe.Exec() + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:dead + // ARGV[1] -> TaskMessage value to remove from InProgress queue + // ARGV[2] -> TaskMessage value to add to Dead queue + // ARGV[3] -> died_at UNIX timestamp + // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) + // ARGV[5] -> max number of tasks in dead queue (e.g., 100) + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, deadQ}, + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 88791c2..b6f7089 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -115,35 +115,80 @@ func TestDone(t *testing.T) { func TestKill(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", nil) + t2 := newTaskMessage("reindex", nil) + t3 := newTaskMessage("generate_csv", nil) + errMsg := "SMTP server not responding" + t1AfterKill := &TaskMessage{ + ID: t1.ID, + Type: t1.Type, + Payload: t1.Payload, + Queue: t1.Queue, + Retry: t1.Retry, + Retried: t1.Retried, + ErrorMsg: errMsg, + } + now := time.Now() // TODO(hibiken): add test cases for trimming tests := []struct { - dead []sortedSetEntry // inital state of dead queue - target *TaskMessage // task to kill - wantDead []*TaskMessage // final state of dead queue + inProgress []*TaskMessage + dead []sortedSetEntry + target *TaskMessage // task to kill + wantInProgress []*TaskMessage + wantDead []sortedSetEntry }{ { - dead: []sortedSetEntry{}, - target: t1, - wantDead: []*TaskMessage{t1}, + inProgress: []*TaskMessage{t1, t2}, + dead: []sortedSetEntry{ + {t3, now.Add(-time.Hour).Unix()}, + }, + target: t1, + wantInProgress: []*TaskMessage{t2}, + wantDead: []sortedSetEntry{ + {t1AfterKill, now.Unix()}, + {t3, now.Add(-time.Hour).Unix()}, + }, + }, + { + inProgress: []*TaskMessage{t1, t2, t3}, + dead: []sortedSetEntry{}, + target: t1, + wantInProgress: []*TaskMessage{t2, t3}, + wantDead: []sortedSetEntry{ + {t1AfterKill, now.Unix()}, + }, }, } for _, tc := range tests { flushDB(t, r) // clean up db before each test case + seedInProgressQueue(t, r, tc.inProgress) seedDeadQueue(t, r, tc.dead) - err := r.Kill(tc.target) + err := r.Kill(tc.target, errMsg) if err != nil { - t.Error(err) + t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err) continue } - data := r.client.ZRange(deadQ, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, data) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + } + + var gotDead []sortedSetEntry + data := r.client.ZRangeWithScores(deadQ, 0, -1).Val() + for _, z := range data { + gotDead = append(gotDead, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff) - continue } } } @@ -312,36 +357,77 @@ func TestSchedule(t *testing.T) { } } -func TestRetryLater(t *testing.T) { +func TestRetry(t *testing.T) { r := setup(t) + t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) + t2 := newTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"}) + t3 := newTaskMessage("reindex", nil) + t1.Retried = 10 + errMsg := "SMTP server is not responding" + t1AfterRetry := &TaskMessage{ + ID: t1.ID, + Type: t1.Type, + Payload: t1.Payload, + Queue: t1.Queue, + Retry: t1.Retry, + Retried: t1.Retried + 1, + ErrorMsg: errMsg, + } + now := time.Now() + tests := []struct { - msg *TaskMessage - processAt time.Time + inProgress []*TaskMessage + retry []sortedSetEntry + msg *TaskMessage + processAt time.Time + errMsg string + wantInProgress []*TaskMessage + wantRetry []sortedSetEntry }{ { - newTaskMessage("send_email", map[string]interface{}{"subject": "hello"}), - time.Now().Add(15 * time.Minute), + inProgress: []*TaskMessage{t1, t2}, + retry: []sortedSetEntry{ + {t3, now.Add(time.Minute).Unix()}, + }, + msg: t1, + processAt: now.Add(5 * time.Minute), + errMsg: errMsg, + wantInProgress: []*TaskMessage{t2}, + wantRetry: []sortedSetEntry{ + {t1AfterRetry, now.Add(5 * time.Minute).Unix()}, + {t3, now.Add(time.Minute).Unix()}, + }, }, } for _, tc := range tests { - flushDB(t, r) // clean up db before each test case + flushDB(t, r) + seedInProgressQueue(t, r, tc.inProgress) + seedRetryQueue(t, r, tc.retry) - desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) - err := r.RetryLater(tc.msg, tc.processAt) + err := r.Retry(tc.msg, tc.processAt, tc.errMsg) if err != nil { - t.Errorf("%s = %v, want nil", desc, err) + t.Errorf("(*RDB).Retry = %v, want nil", err) continue } - res := r.client.ZRangeWithScores(retryQ, 0, -1).Val() - if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ) - continue + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) } - if res[0].Score != float64(tc.processAt.Unix()) { - t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) - continue + + gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val() + var gotRetry []sortedSetEntry + for _, z := range gotRetryRaw { + gotRetry = append(gotRetry, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) } } } diff --git a/poller_test.go b/poller_test.go index 56687d7..f05e693 100644 --- a/poller_test.go +++ b/poller_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/hibiken/asynq/internal/rdb" ) @@ -74,7 +75,10 @@ func TestPoller(t *testing.T) { } // initialize retry queue for _, st := range tc.initRetry { - err := rdbClient.RetryLater(st.msg, st.processAt) + err := r.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, st.msg), + Score: float64(st.processAt.Unix()), + }).Err() if err != nil { t.Fatal(err) } diff --git a/processor.go b/processor.go index 2530163..8653401 100644 --- a/processor.go +++ b/processor.go @@ -3,6 +3,8 @@ package asynq import ( "fmt" "log" + "math" + "math/rand" "time" "github.com/hibiken/asynq/internal/rdb" @@ -24,6 +26,9 @@ type processor struct { // channel to communicate back to the long running "processor" goroutine. done chan struct{} + + // quit channel communicates to the in-flight worker goroutines to stop. + quit chan struct{} } func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { @@ -33,6 +38,7 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { dequeueTimeout: 5 * time.Second, sema: make(chan struct{}, numWorkers), done: make(chan struct{}), + quit: make(chan struct{}), } } @@ -42,12 +48,16 @@ func (p *processor) terminate() { // Signal the processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} + // TODO(hibiken): Allow user to customize this timeout value. + const timeout = 8 * time.Second + time.AfterFunc(timeout, func() { close(p.quit) }) log.Println("[INFO] Waiting for all workers to finish...") // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} } log.Println("[INFO] All workers have finished.") + p.restore() // move any unfinished tasks back to the queue. } func (p *processor) start() { @@ -80,22 +90,37 @@ func (p *processor) exec() { return } - task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token - go func(task *Task) { - // NOTE: This deferred anonymous function needs to take taskMessage as a value because - // the message can be mutated by the time this function is called. - defer func(msg rdb.TaskMessage) { - if err := p.rdb.Done(&msg); err != nil { - log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err) + go func() { + defer func() { <-p.sema /* release token */ }() + + resCh := make(chan error, 1) + task := &Task{Type: msg.Type, Payload: msg.Payload} + go func() { + resCh <- perform(p.handler, task) + }() + + select { + case <-p.quit: + // time is up, quit this worker goroutine. + return + case resErr := <-resCh: + // Note: One of three things should happen. + // 1) Done -> Removes the message from InProgress + // 2) Retry -> Removes the message from InProgress & Adds the message to Retry + // 3) Kill -> Removes the message from InProgress & Adds the message to Dead + if resErr != nil { + if msg.Retried >= msg.Retry { + p.kill(msg, resErr.Error()) + return + } + p.retry(msg, resErr.Error()) + return } - <-p.sema // release token - }(*msg) - err := perform(p.handler, task) - if err != nil { - retryTask(p.rdb, msg, err) + p.markAsDone(msg) + return } - }(task) + }() } // restore moves all tasks from "in-progress" back to queue @@ -103,7 +128,30 @@ func (p *processor) exec() { func (p *processor) restore() { err := p.rdb.RestoreUnfinished() if err != nil { - log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err) + log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) + } +} + +func (p *processor) markAsDone(msg *rdb.TaskMessage) { + err := p.rdb.Done(msg) + if err != nil { + log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) + } +} + +func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) { + retryAt := time.Now().Add(delaySeconds(msg.Retried)) + err := p.rdb.Retry(msg, retryAt, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) + } +} + +func (p *processor) kill(msg *rdb.TaskMessage, errMsg string) { + log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) + err := p.rdb.Kill(msg, errMsg) + if err != nil { + log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) } } @@ -118,3 +166,11 @@ func perform(h Handler, task *Task) (err error) { }() return h.ProcessTask(task) } + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +} diff --git a/retry.go b/retry.go deleted file mode 100644 index 838d07a..0000000 --- a/retry.go +++ /dev/null @@ -1,36 +0,0 @@ -package asynq - -import ( - "log" - "math" - "math/rand" - "time" - - "github.com/hibiken/asynq/internal/rdb" -) - -func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) { - msg.ErrorMsg = err.Error() - if msg.Retried >= msg.Retry { - log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) - if err := r.Kill(msg); err != nil { - log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err) - } - return - } - retryAt := time.Now().Add(delaySeconds((msg.Retried))) - log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) - msg.Retried++ - if err := r.RetryLater(msg, retryAt); err != nil { - log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) - return - } -} - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/retry_test.go b/retry_test.go deleted file mode 100644 index 7a25eb3..0000000 --- a/retry_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package asynq - -import ( - "fmt" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/rs/xid" - "github.com/hibiken/asynq/internal/rdb" -) - -func TestRetry(t *testing.T) { - r := setup(t) - rdbClient := rdb.NewRDB(r) - errMsg := "email server not responding" - // t1 is a task with max-retry count reached. - t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: xid.New()} - // t2 is t1 with updated error message. - t2 := *t1 - t2.ErrorMsg = errMsg - // t3 is a task which hasn't reached max-retry count. - t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: xid.New()} - // t4 is t3 after retry. - t4 := *t3 - t4.Retried++ - t4.ErrorMsg = errMsg - - tests := []struct { - desc string // test case description - msg *rdb.TaskMessage // task to retry - err error // error that caused retry - wantDead []*rdb.TaskMessage // state "dead" queue should be in - wantRetry []*rdb.TaskMessage // state "retry" queue should be in - }{ - { - desc: "With retry exhausted task", - msg: t1, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{&t2}, - wantRetry: []*rdb.TaskMessage{}, - }, - { - desc: "With retry-able task", - msg: t3, - err: fmt.Errorf(errMsg), - wantDead: []*rdb.TaskMessage{}, - wantRetry: []*rdb.TaskMessage{&t4}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.FlushDB().Err(); err != nil { - t.Fatal(err) - } - - retryTask(rdbClient, tc.msg, tc.err) - - deadQueue := r.ZRange(deadQ, 0, -1).Val() - gotDead := mustUnmarshalSlice(t, deadQueue) - if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - - retryQueue := r.ZRange(retryQ, 0, -1).Val() - gotRetry := mustUnmarshalSlice(t, retryQueue) - if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) - } - } -}