From a873d488eeb947dafc1f0813ac921964e8da0c55 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 10 Aug 2020 05:37:49 -0700 Subject: [PATCH] Update ListDeadlineExceeded in RDB --- internal/rdb/rdb.go | 20 ++++++------ internal/rdb/rdb_test.go | 66 +++++++++++++++++++++++++--------------- 2 files changed, 52 insertions(+), 34 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 6132ec4..be52658 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -469,23 +469,25 @@ func (r *RDB) forwardAll(src, dst string) error { return nil } -// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. -func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { +// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues. +func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { var msgs []*base.TaskMessage opt := &redis.ZRangeBy{ Min: "-inf", Max: strconv.FormatInt(deadline.Unix(), 10), } - res, err := r.client.ZRangeByScore(base.KeyDeadlines, opt).Result() - if err != nil { - return nil, err - } - for _, s := range res { - msg, err := base.DecodeMessage(s) + for _, qname := range qnames { + res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result() if err != nil { return nil, err } - msgs = append(msgs, msg) + for _, s := range res { + msg, err := base.DecodeMessage(s) + if err != nil { + return nil, err + } + msgs = append(msgs, msg) + } } return msgs, nil } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 61595b3..a668bb1 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1300,8 +1300,8 @@ func TestCheckAndEnqueue(t *testing.T) { } func TestListDeadlineExceeded(t *testing.T) { - t1 := h.NewTaskMessage("task1", nil) - t2 := h.NewTaskMessage("task2", nil) + t1 := h.NewTaskMessageWithQueue("task1", nil, "default") + t2 := h.NewTaskMessageWithQueue("task2", nil, "default") t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") now := time.Now() @@ -1312,52 +1312,68 @@ func TestListDeadlineExceeded(t *testing.T) { tests := []struct { desc string - deadlines []base.Z + deadlines map[string][]base.Z + qnames []string t time.Time want []*base.TaskMessage }{ { desc: "with one task in-progress", - deadlines: []base.Z{ - {Message: t1, Score: fiveMinutesAgo.Unix()}, + deadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}}, }, - t: time.Now(), - want: []*base.TaskMessage{t1}, + qnames: []string{"default"}, + t: time.Now(), + want: []*base.TaskMessage{t1}, }, { desc: "with multiple tasks in-progress, and one expired", - deadlines: []base.Z{ - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesFromNow.Unix()}, - {Message: t3, Score: oneHourFromNow.Unix()}, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: oneHourAgo.Unix()}, + {Message: t2, Score: fiveMinutesFromNow.Unix()}, + }, + "critical": { + {Message: t3, Score: oneHourFromNow.Unix()}, + }, }, - t: time.Now(), - want: []*base.TaskMessage{t1}, + qnames: []string{"default", "critical"}, + t: time.Now(), + want: []*base.TaskMessage{t1}, }, { desc: "with multiple expired tasks in-progress", - deadlines: []base.Z{ - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesAgo.Unix()}, - {Message: t3, Score: oneHourFromNow.Unix()}, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: oneHourAgo.Unix()}, + {Message: t2, Score: oneHourFromNow.Unix()}, + }, + "critical": { + {Message: t3, Score: fiveMinutesAgo.Unix()}, + }, }, - t: time.Now(), - want: []*base.TaskMessage{t1, t2}, + qnames: []string{"default", "critical"}, + t: time.Now(), + want: []*base.TaskMessage{t1, t2}, }, { - desc: "with empty in-progress queue", - deadlines: []base.Z{}, - t: time.Now(), - want: []*base.TaskMessage{}, + desc: "with empty in-progress queue", + deadlines: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + qnames: []string{"default", "critical"}, + t: time.Now(), + want: []*base.TaskMessage{}, }, } r := setup(t) for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedDeadlines(t, r.client, tc.deadlines) + h.SeedAllDeadlines(t, r.client, tc.deadlines) - got, err := r.ListDeadlineExceeded(tc.t) + got, err := r.ListDeadlineExceeded(tc.t, tc.qnames...) if err != nil { t.Errorf("%s; ListDeadlineExceeded(%v) returned error: %v", tc.desc, tc.t, err) continue