diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 135e5ea..85cee77 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -344,6 +344,8 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items [ key = base.RetryKey(qname) case stateArchived: key = base.ArchivedKey(qname) + case stateActive: + key = base.DeadlinesKey(qname) default: tb.Fatalf("cannot seed redis ZSET with task state %s", state) } @@ -362,9 +364,13 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items [ processAt int64 lastFailedAt int64 ) - if state == stateScheduled || state == stateRetry { + if state == stateScheduled { processAt = item.Score } + if state == stateRetry { + processAt = item.Score + lastFailedAt = time.Now().Unix() + } if state == stateArchived { lastFailedAt = item.Score } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 795fab8..63c39b1 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -392,7 +392,7 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskInfo, error) // ARGV[2] -> stop offset // ARGV[3] -> task key prefix var listMessagesCmd = redis.NewScript(` -local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2]) +local ids = redis.call("LRANGE", KEYS[1], ARGV[1], ARGV[2]) local res = {} for _, id in ipairs(ids) do local key = ARGV[3] .. id @@ -435,7 +435,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskInfo, // ListScheduled returns all tasks from the given queue that are scheduled // to be processed in the future. -func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]*base.TaskInfo, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } @@ -444,7 +444,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) { // ListRetry returns all tasks from the given queue that have failed before // and willl be retried in the future. -func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListRetry(qname string, pgn Pagination) ([]*base.TaskInfo, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } @@ -452,7 +452,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { } // ListArchived returns all tasks from the given queue that have exhausted its retry limit. -func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) ListArchived(qname string, pgn Pagination) ([]*base.TaskInfo, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } @@ -468,18 +468,17 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { // [msg1, score1, msg2, score2, ..., msgN, scoreN] var listZSetEntriesCmd = redis.NewScript(` local res = {} -local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES") -for i = 1, table.getn(id_score_pairs), 2 do - local key = ARGV[3] .. id_score_pairs[i] - table.insert(res, redis.call("HGET", key, "msg")) - table.insert(res, id_score_pairs[i+1]) +local ids = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2]) +for _, id in ipairs(ids) do + local key = ARGV[3] .. id + table.insert(res, redis.call("HMGET", key, "msg", "state", "process_at", "last_failed_at")) end return res `) // listZSetEntries returns a list of message and score pairs in Redis sorted-set // with the given key. -func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) { +func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]*base.TaskInfo, error) { res, err := listZSetEntriesCmd.Run(r.client, []string{key}, pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result() if err != nil { @@ -489,6 +488,7 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro if err != nil { return nil, err } +<<<<<<< HEAD var zs []base.Z for i := 0; i < len(data); i += 2 { s, err := cast.ToStringE(data[i]) @@ -508,12 +508,21 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro } >>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf msg, err := base.DecodeMessage([]byte(s)) +======= + var tasks []*base.TaskInfo + for _, s := range data { + vals, err := cast.ToSliceE(s) + if err != nil { + return nil, err + } + info, err := makeTaskInfo(vals) +>>>>>>> 4c699a2... Update RDB.ListScheduled, ListRetry, and ListArchived to return list of if err != nil { continue // bad data, ignore and continue } - zs = append(zs, base.Z{Message: msg, Score: score}) + tasks = append(tasks, info) } - return zs, nil + return tasks, nil } // RunAllScheduledTasks enqueues all scheduled tasks from the given queue diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 148b4a0..0541d08 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -731,7 +731,7 @@ func TestListScheduled(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { scheduled: map[string][]base.Z{ @@ -746,10 +746,10 @@ func TestListScheduled(t *testing.T) { }, qname: "default", // should be sorted by score in ascending order - want: []base.Z{ - {Message: m3, Score: p3.Unix()}, - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m3, State: "scheduled", NextProcessAt: p3.Unix(), LastFailedAt: 0}, + {TaskMessage: m1, State: "scheduled", NextProcessAt: p1.Unix(), LastFailedAt: 0}, + {TaskMessage: m2, State: "scheduled", NextProcessAt: p2.Unix(), LastFailedAt: 0}, }, }, { @@ -764,8 +764,8 @@ func TestListScheduled(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m4, Score: p4.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m4, State: "scheduled", NextProcessAt: p4.Unix(), LastFailedAt: 0}, }, }, { @@ -773,7 +773,7 @@ func TestListScheduled(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -787,7 +787,7 @@ func TestListScheduled(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, unixTimeCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue } @@ -838,13 +838,13 @@ func TestListScheduledPagination(t *testing.T) { continue } - first := got[0].Message + first := got[0].TaskMessage if first.Type != tc.wantFirst { t.Errorf("%s; %s returned a list with first message %q, want %q", tc.desc, op, first.Type, tc.wantFirst) } - last := got[len(got)-1].Message + last := got[len(got)-1].TaskMessage if last.Type != tc.wantLast { t.Errorf("%s; %s returned a list with the last message %q, want %q", tc.desc, op, last.Type, tc.wantLast) @@ -882,14 +882,15 @@ func TestListRetry(t *testing.T) { Retry: 25, Retried: 3, } - p1 := time.Now().Add(5 * time.Minute) - p2 := time.Now().Add(24 * time.Hour) - p3 := time.Now().Add(24 * time.Hour) + now := time.Now() + p1 := now.Add(5 * time.Minute) + p2 := now.Add(24 * time.Hour) + p3 := now.Add(24 * time.Hour) tests := []struct { retry map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { retry: map[string][]base.Z{ @@ -902,9 +903,9 @@ func TestListRetry(t *testing.T) { }, }, qname: "default", - want: []base.Z{ - {Message: m1, Score: p1.Unix()}, - {Message: m2, Score: p2.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m1, State: "retry", NextProcessAt: p1.Unix(), LastFailedAt: now.Unix()}, + {TaskMessage: m2, State: "retry", NextProcessAt: p2.Unix(), LastFailedAt: now.Unix()}, }, }, { @@ -918,8 +919,8 @@ func TestListRetry(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m3, Score: p3.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m3, State: "retry", NextProcessAt: p3.Unix(), LastFailedAt: now.Unix()}, }, }, { @@ -927,7 +928,7 @@ func TestListRetry(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -941,7 +942,7 @@ func TestListRetry(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, unixTimeCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -997,13 +998,13 @@ func TestListRetryPagination(t *testing.T) { continue } - first := got[0].Message + first := got[0].TaskMessage if first.Type != tc.wantFirst { t.Errorf("%s; %s returned a list with first message %q, want %q", tc.desc, op, first.Type, tc.wantFirst) } - last := got[len(got)-1].Message + last := got[len(got)-1].TaskMessage if last.Type != tc.wantLast { t.Errorf("%s; %s returned a list with the last message %q, want %q", tc.desc, op, last.Type, tc.wantLast) @@ -1042,7 +1043,7 @@ func TestListArchived(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - want []base.Z + want []*base.TaskInfo }{ { archived: map[string][]base.Z{ @@ -1055,9 +1056,9 @@ func TestListArchived(t *testing.T) { }, }, qname: "default", - want: []base.Z{ - {Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order? - {Message: m1, Score: f1.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m2, State: "archived", NextProcessAt: 0, LastFailedAt: f2.Unix()}, // FIXME: shouldn't these be sorted in the other order? + {TaskMessage: m1, State: "archived", NextProcessAt: 0, LastFailedAt: f1.Unix()}, }, }, { @@ -1071,8 +1072,8 @@ func TestListArchived(t *testing.T) { }, }, qname: "custom", - want: []base.Z{ - {Message: m3, Score: f3.Unix()}, + want: []*base.TaskInfo{ + {TaskMessage: m3, State: "archived", NextProcessAt: 0, LastFailedAt: f3.Unix()}, }, }, { @@ -1080,7 +1081,7 @@ func TestListArchived(t *testing.T) { "default": {}, }, qname: "default", - want: []base.Z(nil), + want: []*base.TaskInfo(nil), }, } @@ -1094,7 +1095,7 @@ func TestListArchived(t *testing.T) { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, unixTimeCmpOpt); diff != "" { t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff) continue @@ -1147,13 +1148,13 @@ func TestListArchivedPagination(t *testing.T) { continue } - first := got[0].Message + first := got[0].TaskMessage if first.Type != tc.wantFirst { t.Errorf("%s; %s returned a list with first message %q, want %q", tc.desc, op, first.Type, tc.wantFirst) } - last := got[len(got)-1].Message + last := got[len(got)-1].TaskMessage if last.Type != tc.wantLast { t.Errorf("%s; %s returned a list with the last message %q, want %q", tc.desc, op, last.Type, tc.wantLast) @@ -1162,8 +1163,8 @@ func TestListArchivedPagination(t *testing.T) { } var ( - timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time - zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score + timeCmpOpt = cmpopts.EquateApproxTime(2 * time.Second) // allow for 2 seconds margin in time.Time + unixTimeCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in int64 representing unix time ) func TestRunArchivedTask(t *testing.T) { @@ -1893,7 +1894,7 @@ func TestArchiveRetryTask(t *testing.T) { for qname, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryKey(qname), diff) } @@ -1901,7 +1902,7 @@ func TestArchiveRetryTask(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2015,7 +2016,7 @@ func TestArchiveScheduledTask(t *testing.T) { for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledKey(qname), diff) } @@ -2023,7 +2024,7 @@ func TestArchiveScheduledTask(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2129,7 +2130,7 @@ func TestArchivePendingTask(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2261,7 +2262,7 @@ func TestArchiveAllPendingTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2399,7 +2400,7 @@ func TestArchiveAllRetryTasks(t *testing.T) { for qname, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.RetryKey(qname), diff) } @@ -2407,7 +2408,7 @@ func TestArchiveAllRetryTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } @@ -2546,7 +2547,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ScheduledKey(qname), diff) } @@ -2554,7 +2555,7 @@ func TestArchiveAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantArchived { gotDead := h.GetArchivedEntries(t, r.client, qname) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", base.ArchivedKey(qname), diff) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5cd92d9..d9c4c6a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -444,6 +444,7 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim // ARGV[2] -> updated base.TaskMessage value // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp +// ARGV[5] -> current time in Unix seconds var retryCmd = redis.NewScript(` if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -455,7 +456,8 @@ redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "RETRY", - "process_at", ARGV[3]) + "process_at", ARGV[3], + "last_failed_at", ARGV[5]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) @@ -491,6 +493,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e encoded, processAt.Unix(), expireAt.Unix(), + time.Now().Unix(), } return retryCmd.Run(r.client, keys, argv...).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 08ea50c..a935de5 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1208,7 +1208,7 @@ func TestArchive(t *testing.T) { } for queue, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, unixTimeCmpOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) } }