diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 4094b84..ec6f510 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -139,6 +139,12 @@ func TaskMessageWithError(t base.TaskMessage, errMsg string, failedAt time.Time) return &t } +// TaskMessageWithCompletedAt returns an updated copy of t after completion. +func TaskMessageWithCompletedAt(t base.TaskMessage, completedAt time.Time) *base.TaskMessage { + t.CompletedAt = completedAt.Unix() + return &t +} + // MustMarshal marshals given task message and returns a json string. // Calling test will fail if marshaling errors out. func MustMarshal(tb testing.TB, msg *base.TaskMessage) string { @@ -224,6 +230,13 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive) } +// SeedCompletedQueue initializes the completed set witht the given entries. +func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { + tb.Helper() + r.SAdd(context.Background(), base.AllQueues, qname) + seedRedisZSet(tb, r, base.CompletedKey(qname), entries, base.TaskStateCompleted) +} + // SeedAllPendingQueues initializes all of the specified queues with the given messages. // // pending maps a queue name to a list of messages. @@ -274,6 +287,14 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } } +// SeedAllCompletedQueues initializes all of the completed queues with the given entries. +func SeedAllCompletedQueues(tb testing.TB, r redis.UniversalClient, completed map[string][]base.Z) { + tb.Helper() + for q, entries := range completed { + SeedCompletedQueue(tb, r, entries, q) + } +} + func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage, state base.TaskState) { tb.Helper() @@ -367,6 +388,13 @@ func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) [ return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, base.TaskStateArchived) } +// GetCompletedMessages returns all completed task messages in the given queue. +// It also asserts the state field of the task. +func GetCompletedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { + tb.Helper() + return getMessagesFromZSet(tb, r, qname, base.CompletedKey, base.TaskStateCompleted) +} + // GetScheduledEntries returns all scheduled messages and its score in the given queue. // It also asserts the state field of the task. func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { @@ -395,6 +423,13 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [ return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive) } +// GetCompletedEntries returns all completed messages and its score in the given queue. +// It also asserts the state field of the task. +func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { + tb.Helper() + return getMessagesFromZSetWithScores(tb, r, qname, base.CompletedKey, base.TaskStateCompleted) +} + // Retrieves all messages stored under `keyFn(qname)` key in redis list. func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f546253..96ab044 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -330,7 +330,7 @@ end return redis.status_reply("OK") `) -// Done removes the task from active queue to mark the task as done. +// Done removes the task from active queue and deletes the task. // It removes a uniqueness lock acquired by the task, if any. func (r *RDB) Done(msg *base.TaskMessage) error { var op errors.Op = "rdb.Done" @@ -346,6 +346,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { msg.ID, expireAt.Unix(), } + // Note: We cannot pass empty unique key when running this script in redis-cluster. if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) return r.runScript(op, doneUniqueCmd, keys, argv...) @@ -353,6 +354,96 @@ func (r *RDB) Done(msg *base.TaskMessage) error { return r.runScript(op, doneCmd, keys, argv...) } +// KEYS[1] -> asynq:{}:active +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:completed +// KEYS[4] -> asynq:{}:t: +// KEYS[5] -> asynq:{}:processed: +// ARGV[1] -> task ID +// ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task exipration time in unix time +// ARGV[4] -> task message data +var markAsCompleteCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then + redis.redis.error_reply("INTERNAL") +end +redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed") +local n = redis.call("INCR", KEYS[5]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[5], ARGV[2]) +end +return redis.status_reply("OK") +`) + +// KEYS[1] -> asynq:{}:active +// KEYS[2] -> asynq:{}:deadlines +// KEYS[3] -> asynq:{}:completed +// KEYS[4] -> asynq:{}:t: +// KEYS[5] -> asynq:{}:processed: +// KEYS[6] -> asynq:{}:unique:{} +// ARGV[1] -> task ID +// ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task exipration time in unix time +// ARGV[4] -> task message data +var markAsCompleteUniqueCmd = redis.NewScript(` +if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then + return redis.error_reply("NOT FOUND") +end +if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then + redis.redis.error_reply("INTERNAL") +end +redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed") +local n = redis.call("INCR", KEYS[5]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[5], ARGV[2]) +end +if redis.call("GET", KEYS[6]) == ARGV[1] then + redis.call("DEL", KEYS[6]) +end +return redis.status_reply("OK") +`) + +// MarkAsComplete removes the task from active queue to mark the task as completed. +// It removes a uniqueness lock acquired by the task, if any. +func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error { + var op errors.Op = "rdb.MarkAsComplete" + now := time.Now() + statsExpireAt := now.Add(statsTTL) + msg.CompletedAt = now.Unix() + encoded, err := base.EncodeMessage(msg) + if err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) + } + keys := []string{ + base.ActiveKey(msg.Queue), + base.DeadlinesKey(msg.Queue), + base.CompletedKey(msg.Queue), + base.TaskKey(msg.Queue, msg.ID), + base.ProcessedKey(msg.Queue, now), + } + argv := []interface{}{ + msg.ID, + statsExpireAt.Unix(), + now.Unix() + msg.ResultTTL, + encoded, + } + // Note: We cannot pass empty unique key when running this script in redis-cluster. + if len(msg.UniqueKey) > 0 { + keys = append(keys, msg.UniqueKey) + return r.runScript(op, markAsCompleteUniqueCmd, keys, argv...) + } + return r.runScript(op, markAsCompleteCmd, keys, argv...) +} + // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:pending diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 233000e..49d7d14 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -677,17 +677,17 @@ func TestDone(t *testing.T) { Payload: nil, Timeout: 1800, Deadline: 0, - UniqueKey: "asynq:{default}:unique:reindex:nil", + UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72", Queue: "default", } t1Deadline := now.Unix() + t1.Timeout t2Deadline := t2.Deadline - t3Deadline := now.Unix() + t3.Deadline + t3Deadline := now.Unix() + t3.Timeout tests := []struct { desc string active map[string][]*base.TaskMessage // initial state of the active list - deadlines map[string][]base.Z // initial state of deadlines set + deadlines map[string][]base.Z // initial state of the deadlines set target *base.TaskMessage // task to remove wantActive map[string][]*base.TaskMessage // final state of the active list wantDeadlines map[string][]base.Z // final state of the deadline set @@ -804,6 +804,201 @@ func TestDone(t *testing.T) { } } +func TestMarkAsComplete(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + t1 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "send_email", + Payload: nil, + Timeout: 1800, + Deadline: 0, + Queue: "default", + ResultTTL: 3600, + } + t2 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "export_csv", + Payload: nil, + Timeout: 0, + Deadline: now.Add(2 * time.Hour).Unix(), + Queue: "custom", + ResultTTL: 7200, + } + t3 := &base.TaskMessage{ + ID: uuid.NewString(), + Type: "reindex", + Payload: nil, + Timeout: 1800, + Deadline: 0, + UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72", + Queue: "default", + ResultTTL: 1800, + } + t1Deadline := now.Unix() + t1.Timeout + t2Deadline := t2.Deadline + t3Deadline := now.Unix() + t3.Timeout + + tests := []struct { + desc string + active map[string][]*base.TaskMessage // initial state of the active list + deadlines map[string][]base.Z // initial state of the deadlines set + completed map[string][]base.Z // initial state of the completed set + target *base.TaskMessage // task to mark as completed + wantActive map[string][]*base.TaskMessage // final state of the active list + wantDeadlines map[string][]base.Z // final state of the deadline set + wantCompleted func(completedAt time.Time) map[string][]base.Z // final state of the completed set + }{ + { + desc: "select a message from the correct queue", + active: map[string][]*base.TaskMessage{ + "default": {t1}, + "custom": {t2}, + }, + deadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: t1Deadline}}, + "custom": {{Message: t2, Score: t2Deadline}}, + }, + completed: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + target: t1, + wantActive: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {t2}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + "custom": {{Message: t2, Score: t2Deadline}}, + }, + wantCompleted: func(completedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.ResultTTL}}, + "custom": {}, + } + }, + }, + { + desc: "with one queue", + active: map[string][]*base.TaskMessage{ + "default": {t1}, + }, + deadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: t1Deadline}}, + }, + completed: map[string][]base.Z{ + "default": {}, + }, + target: t1, + wantActive: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + }, + wantCompleted: func(completedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.ResultTTL}}, + } + }, + }, + { + desc: "with multiple messages in a queue", + active: map[string][]*base.TaskMessage{ + "default": {t1, t3}, + "custom": {t2}, + }, + deadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}}, + "custom": {{Message: t2, Score: t2Deadline}}, + }, + completed: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + target: t3, + wantActive: map[string][]*base.TaskMessage{ + "default": {t1}, + "custom": {t2}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: t1Deadline}}, + "custom": {{Message: t2, Score: t2Deadline}}, + }, + wantCompleted: func(completedAt time.Time) map[string][]base.Z { + return map[string][]base.Z{ + "default": {{Message: h.TaskMessageWithCompletedAt(*t3, completedAt), Score: completedAt.Unix() + t3.ResultTTL}}, + "custom": {}, + } + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllDeadlines(t, r.client, tc.deadlines) + h.SeedAllActiveQueues(t, r.client, tc.active) + h.SeedAllCompletedQueues(t, r.client, tc.completed) + for _, msgs := range tc.active { + for _, msg := range msgs { + // Set uniqueness lock if unique key is present. + if len(msg.UniqueKey) > 0 { + err := r.client.SetNX(context.Background(), msg.UniqueKey, msg.ID, time.Minute).Err() + if err != nil { + t.Fatal(err) + } + } + } + } + + completedAt := time.Now() + err := r.MarkAsComplete(tc.target) + if err != nil { + t.Errorf("%s; (*RDB).MarkAsCompleted(task) = %v, want nil", tc.desc, err) + continue + } + + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.ActiveKey(queue), diff) + continue + } + } + for queue, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff) + continue + } + } + for queue, want := range tc.wantCompleted(completedAt) { + gotCompleted := h.GetCompletedEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotCompleted, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.CompletedKey(queue), diff) + continue + } + } + + processedKey := base.ProcessedKey(tc.target.Queue, time.Now()) + gotProcessed := r.client.Get(context.Background(), processedKey).Val() + if gotProcessed != "1" { + t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed) + } + + gotTTL := r.client.TTL(context.Background(), processedKey).Val() + if gotTTL > statsTTL { + t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL) + } + + if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 { + t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey) + } + } +} + func TestRequeue(t *testing.T) { r := setup(t) defer r.Close()