2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-02-23 12:20:19 +08:00

Update ListDeadlineExceeded in RDB

This commit is contained in:
Ken Hibino 2020-08-10 05:37:49 -07:00
parent e0a8f1252a
commit a873d488ee
2 changed files with 52 additions and 34 deletions

View File

@ -469,23 +469,25 @@ func (r *RDB) forwardAll(src, dst string) error {
return nil return nil
} }
// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. // ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
opt := &redis.ZRangeBy{ opt := &redis.ZRangeBy{
Min: "-inf", Min: "-inf",
Max: strconv.FormatInt(deadline.Unix(), 10), Max: strconv.FormatInt(deadline.Unix(), 10),
} }
res, err := r.client.ZRangeByScore(base.KeyDeadlines, opt).Result() for _, qname := range qnames {
if err != nil { res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result()
return nil, err
}
for _, s := range res {
msg, err := base.DecodeMessage(s)
if err != nil { if err != nil {
return nil, err return nil, err
} }
msgs = append(msgs, msg) for _, s := range res {
msg, err := base.DecodeMessage(s)
if err != nil {
return nil, err
}
msgs = append(msgs, msg)
}
} }
return msgs, nil return msgs, nil
} }

View File

@ -1300,8 +1300,8 @@ func TestCheckAndEnqueue(t *testing.T) {
} }
func TestListDeadlineExceeded(t *testing.T) { func TestListDeadlineExceeded(t *testing.T) {
t1 := h.NewTaskMessage("task1", nil) t1 := h.NewTaskMessageWithQueue("task1", nil, "default")
t2 := h.NewTaskMessage("task2", nil) t2 := h.NewTaskMessageWithQueue("task2", nil, "default")
t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") t3 := h.NewTaskMessageWithQueue("task3", nil, "critical")
now := time.Now() now := time.Now()
@ -1312,52 +1312,68 @@ func TestListDeadlineExceeded(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
deadlines []base.Z deadlines map[string][]base.Z
qnames []string
t time.Time t time.Time
want []*base.TaskMessage want []*base.TaskMessage
}{ }{
{ {
desc: "with one task in-progress", desc: "with one task in-progress",
deadlines: []base.Z{ deadlines: map[string][]base.Z{
{Message: t1, Score: fiveMinutesAgo.Unix()}, "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}},
}, },
t: time.Now(), qnames: []string{"default"},
want: []*base.TaskMessage{t1}, t: time.Now(),
want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple tasks in-progress, and one expired", desc: "with multiple tasks in-progress, and one expired",
deadlines: []base.Z{ deadlines: map[string][]base.Z{
{Message: t1, Score: oneHourAgo.Unix()}, "default": {
{Message: t2, Score: fiveMinutesFromNow.Unix()}, {Message: t1, Score: oneHourAgo.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()}, {Message: t2, Score: fiveMinutesFromNow.Unix()},
},
"critical": {
{Message: t3, Score: oneHourFromNow.Unix()},
},
}, },
t: time.Now(), qnames: []string{"default", "critical"},
want: []*base.TaskMessage{t1}, t: time.Now(),
want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple expired tasks in-progress", desc: "with multiple expired tasks in-progress",
deadlines: []base.Z{ deadlines: map[string][]base.Z{
{Message: t1, Score: oneHourAgo.Unix()}, "default": {
{Message: t2, Score: fiveMinutesAgo.Unix()}, {Message: t1, Score: oneHourAgo.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()}, {Message: t2, Score: oneHourFromNow.Unix()},
},
"critical": {
{Message: t3, Score: fiveMinutesAgo.Unix()},
},
}, },
t: time.Now(), qnames: []string{"default", "critical"},
want: []*base.TaskMessage{t1, t2}, t: time.Now(),
want: []*base.TaskMessage{t1, t2},
}, },
{ {
desc: "with empty in-progress queue", desc: "with empty in-progress queue",
deadlines: []base.Z{}, deadlines: map[string][]base.Z{
t: time.Now(), "default": {},
want: []*base.TaskMessage{}, "critical": {},
},
qnames: []string{"default", "critical"},
t: time.Now(),
want: []*base.TaskMessage{},
}, },
} }
r := setup(t) r := setup(t)
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) h.FlushDB(t, r.client)
h.SeedDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
got, err := r.ListDeadlineExceeded(tc.t) got, err := r.ListDeadlineExceeded(tc.t, tc.qnames...)
if err != nil { if err != nil {
t.Errorf("%s; ListDeadlineExceeded(%v) returned error: %v", tc.desc, tc.t, err) t.Errorf("%s; ListDeadlineExceeded(%v) returned error: %v", tc.desc, tc.t, err)
continue continue