diff --git a/asynq.go b/asynq.go index aa5ec84..0510ca7 100644 --- a/asynq.go +++ b/asynq.go @@ -4,6 +4,8 @@ import "github.com/go-redis/redis/v7" /* TODOs: +- [P0] Proper OS Signal handling +- [P0] Wait for a certain amount of time for wokers to finish on TERM signal - [P0] asynqmon kill , asynqmon killall - [P0] Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Redis Memory Usage, Connection info in stats diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go index 5f39bc7..55ac3ef 100644 --- a/internal/rdb/helpers_test.go +++ b/internal/rdb/helpers_test.go @@ -45,6 +45,14 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessa return out }) +var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry { + out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].msg.ID.String() < out[j].msg.ID.String() + }) + return out +}) + func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { return &TaskMessage{ ID: xid.New(), diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d455ad2..fde76b9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -127,6 +127,30 @@ func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { return r.schedule(retryQ, processAt, msg) } +// Retry moves the task from in-progress to retry queue, incrementing retry count +// and assigning error message to the task message. +func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + // KEYS[1] -> asynq:in_progress + // KEYS[2] -> asynq:retry + // ARGV[1] -> TaskMessage value + // ARGV[2] -> error message + // ARGV[3] -> retry_at UNIX timestamp + script := redis.NewScript(` + redis.call("LREM", KEYS[1], 0, ARGV[1]) + local msg = cjson.decode(ARGV[1]) + msg["Retried"] = msg["Retried"] + 1 + msg["ErrorMsg"] = ARGV[2] + redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg)) + return redis.status_reply("OK") + `) + _, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result() + return err +} + // schedule adds the task to the zset to be processd at the specified time. func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { bytes, err := json.Marshal(msg) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 88791c2..3ee37f3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -345,3 +345,78 @@ func TestRetryLater(t *testing.T) { } } } + +func TestRetry(t *testing.T) { + r := setup(t) + t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) + t2 := newTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"}) + t3 := newTaskMessage("reindex", nil) + t1.Retried = 10 + errMsg := "SMTP server is not responding" + t1AfterRetry := &TaskMessage{ + ID: t1.ID, + Type: t1.Type, + Payload: t1.Payload, + Queue: t1.Queue, + Retry: t1.Retry, + Retried: t1.Retried + 1, + ErrorMsg: errMsg, + } + now := time.Now() + + tests := []struct { + inProgress []*TaskMessage + retry []sortedSetEntry + msg *TaskMessage + processAt time.Time + errMsg string + wantInProgress []*TaskMessage + wantRetry []sortedSetEntry + }{ + { + inProgress: []*TaskMessage{t1, t2}, + retry: []sortedSetEntry{ + {t3, now.Add(time.Minute).Unix()}, + }, + msg: t1, + processAt: now.Add(5 * time.Minute), + errMsg: errMsg, + wantInProgress: []*TaskMessage{t2}, + wantRetry: []sortedSetEntry{ + {t1AfterRetry, now.Add(5 * time.Minute).Unix()}, + {t3, now.Add(time.Minute).Unix()}, + }, + }, + } + + for _, tc := range tests { + flushDB(t, r) + seedInProgressQueue(t, r, tc.inProgress) + seedRetryQueue(t, r, tc.retry) + + err := r.Retry(tc.msg, tc.processAt, tc.errMsg) + if err != nil { + t.Errorf("(*RDB).Retry = %v, want nil", err) + continue + } + + gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) + if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + } + + gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val() + var gotRetry []sortedSetEntry + for _, z := range gotRetryRaw { + gotRetry = append(gotRetry, sortedSetEntry{ + msg: mustUnmarshal(t, z.Member.(string)), + score: int64(z.Score), + }) + } + cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) + if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + } + } +}