diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 85cee77..fbd4666 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -477,3 +477,25 @@ func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, qname } return res } + +// GetRetryEntries returns all retry messages and its score in the given queue. +func GetRetryTasks(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskInfo { + tb.Helper() + zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val() + var tasks []*base.TaskInfo + for _, z := range zs { + vals := r.HMGet(base.TaskKey(qname, z.Member.(string)), "msg", "state", "process_at", "last_failed_at").Val() + if len(vals) != 4 { + tb.Fatalf("unexpected number of values returned from HMGET command, got %d elements, want 4", len(vals)) + } + if vals[0] == redis.Nil { + tb.Fatalf("msg field contained nil for task ID %v", z.Member) + } + if vals[1] == redis.Nil { + tb.Fatalf("state field contained nil for task ID %v", z.Member) + } + // TODO: continue from here + + } + return res +} diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index cff9f7e..13addcd 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -664,6 +664,7 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { // ARGV[3] -> current timestamp in unix time // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in archived state (e.g., 100) +// FIXME: Need to update state field of the task under task-key var archiveTaskCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 0 then return 0 diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0541d08..1adfa05 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1806,8 +1806,8 @@ func TestArchiveRetryTask(t *testing.T) { qname string id uuid.UUID want error - wantRetry map[string][]base.Z - wantArchived map[string][]base.Z + wantRetry map[string][]*base.TaskInfo + wantArchived map[string][]*base.TaskInfo }{ { retry: map[string][]base.Z{ @@ -1822,11 +1822,15 @@ func TestArchiveRetryTask(t *testing.T) { qname: "default", id: m1.ID, want: nil, - wantRetry: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, + wantRetry: map[string][]*base.TaskInfo{ + "default": { + {TaskMessage: m2, State: "retry", NextProcessAt: t2.Unix(), LastFailedAt: 0}, + }, }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m1, Score: time.Now().Unix()}}, + wantArchived: map[string][]*base.TaskInfo{ + "default": { + {TaskMessage: m1, State: "archived", NextProcessAt: 0, LastFailedAt: time.Now().Unix()}, + }, }, }, { @@ -1839,11 +1843,15 @@ func TestArchiveRetryTask(t *testing.T) { qname: "default", id: uuid.New(), want: ErrTaskNotFound, - wantRetry: map[string][]base.Z{ - "default": {{Message: m1, Score: t1.Unix()}}, + wantRetry: map[string][]*base.TaskInfo{ + "default": { + {TaskMessage: m1, State: "retry", NextProcessAt: t1.Unix(), LastFailedAt: 0}, + }, }, - wantArchived: map[string][]base.Z{ - "default": {{Message: m2, Score: t2.Unix()}}, + wantArchived: map[string][]*base.TaskInfo{ + "default": { + {TaskMessage: m2, State: "archived", NextProcessAt: 0, LastFailedAt: t2.Unix()}, + }, }, }, { @@ -1864,18 +1872,20 @@ func TestArchiveRetryTask(t *testing.T) { qname: "custom", id: m3.ID, want: nil, - wantRetry: map[string][]base.Z{ + wantRetry: map[string][]*base.TaskInfo{ "default": { - {Message: m1, Score: t1.Unix()}, - {Message: m2, Score: t2.Unix()}, + {TaskMessage: m1, State: "retry", NextProcessAt: t1.Unix(), LastFailedAt: 0}, + {TaskMessage: m2, State: "retry", NextProcessAt: t2.Unix(), LastFailedAt: 0}, }, "custom": { - {Message: m4, Score: t4.Unix()}, + {TaskMessage: m4, State: "retry", NextProcessAt: t4.Unix(), LastFailedAt: 0}, }, }, - wantArchived: map[string][]base.Z{ + wantArchived: map[string][]*base.TaskInfo{ "default": {}, - "custom": {{Message: m3, Score: time.Now().Unix()}}, + "custom": { + {TaskMessage: m3, State: "archived", NextProcessAt: 0, LastFailedAt: time.Now().Unix()}, + }, }, }, }