2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00
This commit is contained in:
Ken Hibino 2021-04-04 14:53:12 -07:00
parent 978c608124
commit 6dce3f156e
3 changed files with 49 additions and 16 deletions

View File

@ -477,3 +477,25 @@ func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, qname
} }
return res return res
} }
// GetRetryEntries returns all retry messages and its score in the given queue.
func GetRetryTasks(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskInfo {
tb.Helper()
zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val()
var tasks []*base.TaskInfo
for _, z := range zs {
vals := r.HMGet(base.TaskKey(qname, z.Member.(string)), "msg", "state", "process_at", "last_failed_at").Val()
if len(vals) != 4 {
tb.Fatalf("unexpected number of values returned from HMGET command, got %d elements, want 4", len(vals))
}
if vals[0] == redis.Nil {
tb.Fatalf("msg field contained nil for task ID %v", z.Member)
}
if vals[1] == redis.Nil {
tb.Fatalf("state field contained nil for task ID %v", z.Member)
}
// TODO: continue from here
}
return res
}

View File

@ -664,6 +664,7 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
// ARGV[3] -> current timestamp in unix time // ARGV[3] -> current timestamp in unix time
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archived state (e.g., 100) // ARGV[5] -> max number of tasks in archived state (e.g., 100)
// FIXME: Need to update state field of the task under task-key
var archiveTaskCmd = redis.NewScript(` var archiveTaskCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then if redis.call("EXISTS", KEYS[1]) == 0 then
return 0 return 0

View File

@ -1806,8 +1806,8 @@ func TestArchiveRetryTask(t *testing.T) {
qname string qname string
id uuid.UUID id uuid.UUID
want error want error
wantRetry map[string][]base.Z wantRetry map[string][]*base.TaskInfo
wantArchived map[string][]base.Z wantArchived map[string][]*base.TaskInfo
}{ }{
{ {
retry: map[string][]base.Z{ retry: map[string][]base.Z{
@ -1822,11 +1822,15 @@ func TestArchiveRetryTask(t *testing.T) {
qname: "default", qname: "default",
id: m1.ID, id: m1.ID,
want: nil, want: nil,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]*base.TaskInfo{
"default": {{Message: m2, Score: t2.Unix()}}, "default": {
{TaskMessage: m2, State: "retry", NextProcessAt: t2.Unix(), LastFailedAt: 0},
},
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]*base.TaskInfo{
"default": {{Message: m1, Score: time.Now().Unix()}}, "default": {
{TaskMessage: m1, State: "archived", NextProcessAt: 0, LastFailedAt: time.Now().Unix()},
},
}, },
}, },
{ {
@ -1839,11 +1843,15 @@ func TestArchiveRetryTask(t *testing.T) {
qname: "default", qname: "default",
id: uuid.New(), id: uuid.New(),
want: ErrTaskNotFound, want: ErrTaskNotFound,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]*base.TaskInfo{
"default": {{Message: m1, Score: t1.Unix()}}, "default": {
{TaskMessage: m1, State: "retry", NextProcessAt: t1.Unix(), LastFailedAt: 0},
},
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]*base.TaskInfo{
"default": {{Message: m2, Score: t2.Unix()}}, "default": {
{TaskMessage: m2, State: "archived", NextProcessAt: 0, LastFailedAt: t2.Unix()},
},
}, },
}, },
{ {
@ -1864,18 +1872,20 @@ func TestArchiveRetryTask(t *testing.T) {
qname: "custom", qname: "custom",
id: m3.ID, id: m3.ID,
want: nil, want: nil,
wantRetry: map[string][]base.Z{ wantRetry: map[string][]*base.TaskInfo{
"default": { "default": {
{Message: m1, Score: t1.Unix()}, {TaskMessage: m1, State: "retry", NextProcessAt: t1.Unix(), LastFailedAt: 0},
{Message: m2, Score: t2.Unix()}, {TaskMessage: m2, State: "retry", NextProcessAt: t2.Unix(), LastFailedAt: 0},
}, },
"custom": { "custom": {
{Message: m4, Score: t4.Unix()}, {TaskMessage: m4, State: "retry", NextProcessAt: t4.Unix(), LastFailedAt: 0},
}, },
}, },
wantArchived: map[string][]base.Z{ wantArchived: map[string][]*base.TaskInfo{
"default": {}, "default": {},
"custom": {{Message: m3, Score: time.Now().Unix()}}, "custom": {
{TaskMessage: m3, State: "archived", NextProcessAt: 0, LastFailedAt: time.Now().Unix()},
},
}, },
}, },
} }