diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 6b916f0..36a407e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -918,10 +918,10 @@ func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, e return n, nil } -// KEYS[1] -> asynq:{}:deadlines -// ARGV[1] -> deadline in unix time +// KEYS[1] -> asynq:{}:lease +// ARGV[1] -> cutoff in unix time // ARGV[2] -> task key prefix -var listDeadlineExceededCmd = redis.NewScript(` +var listLeaseExpiredCmd = redis.NewScript(` local res = {} local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, id in ipairs(ids) do @@ -931,14 +931,14 @@ end return res `) -// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues. -func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { - var op errors.Op = "rdb.ListDeadlineExceeded" +// ListLeaseExpired returns a list of task messages with an expired lease from the given queues. +func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { + var op errors.Op = "rdb.ListLeaseExpired" var msgs []*base.TaskMessage for _, qname := range qnames { - res, err := listDeadlineExceededCmd.Run(context.Background(), r.client, - []string{base.DeadlinesKey(qname)}, - deadline.Unix(), base.TaskKeyPrefix(qname)).Result() + res, err := listLeaseExpiredCmd.Run(context.Background(), r.client, + []string{base.LeaseKey(qname)}, + cutoff.Unix(), base.TaskKeyPrefix(qname)).Result() if err != nil { return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a2e479e..5f32012 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2208,71 +2208,67 @@ func TestDeleteExpiredCompletedTasks(t *testing.T) { } } -func TestListDeadlineExceeded(t *testing.T) { +func TestListLeaseExpired(t *testing.T) { t1 := h.NewTaskMessageWithQueue("task1", nil, "default") t2 := h.NewTaskMessageWithQueue("task2", nil, "default") t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") now := time.Now() - oneHourFromNow := now.Add(1 * time.Hour) - fiveMinutesFromNow := now.Add(5 * time.Minute) - fiveMinutesAgo := now.Add(-5 * time.Minute) - oneHourAgo := now.Add(-1 * time.Hour) tests := []struct { - desc string - deadlines map[string][]base.Z - qnames []string - t time.Time - want []*base.TaskMessage + desc string + lease map[string][]base.Z + qnames []string + cutoff time.Time + want []*base.TaskMessage }{ { desc: "with a single active task", - deadlines: map[string][]base.Z{ - "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}}, + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(-10 * time.Second).Unix()}}, }, qnames: []string{"default"}, - t: time.Now(), + cutoff: now, want: []*base.TaskMessage{t1}, }, { desc: "with multiple active tasks, and one expired", - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesFromNow.Unix()}, + {Message: t1, Score: now.Add(-5 * time.Minute).Unix()}, + {Message: t2, Score: now.Add(20 * time.Second).Unix()}, }, "critical": { - {Message: t3, Score: oneHourFromNow.Unix()}, + {Message: t3, Score: now.Add(10 * time.Second).Unix()}, }, }, qnames: []string{"default", "critical"}, - t: time.Now(), + cutoff: now, want: []*base.TaskMessage{t1}, }, { desc: "with multiple expired active tasks", - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": { - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: oneHourFromNow.Unix()}, + {Message: t1, Score: now.Add(-2 * time.Minute).Unix()}, + {Message: t2, Score: now.Add(20 * time.Second).Unix()}, }, "critical": { - {Message: t3, Score: fiveMinutesAgo.Unix()}, + {Message: t3, Score: now.Add(-30 * time.Second).Unix()}, }, }, qnames: []string{"default", "critical"}, - t: time.Now(), + cutoff: now, want: []*base.TaskMessage{t1, t3}, }, { desc: "with empty active queue", - deadlines: map[string][]base.Z{ + lease: map[string][]base.Z{ "default": {}, "critical": {}, }, qnames: []string{"default", "critical"}, - t: time.Now(), + cutoff: now, want: []*base.TaskMessage{}, }, } @@ -2281,17 +2277,17 @@ func TestListDeadlineExceeded(t *testing.T) { defer r.Close() for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllLease(t, r.client, tc.deadlines) + h.SeedAllLease(t, r.client, tc.lease) - got, err := r.ListDeadlineExceeded(tc.t, tc.qnames...) + got, err := r.ListLeaseExpired(tc.cutoff, tc.qnames...) if err != nil { - t.Errorf("%s; ListDeadlineExceeded(%v) returned error: %v", tc.desc, tc.t, err) + t.Errorf("%s; ListLeaseExpired(%v) returned error: %v", tc.desc, tc.cutoff, err) continue } if diff := cmp.Diff(tc.want, got, h.SortMsgOpt); diff != "" { - t.Errorf("%s; ListDeadlineExceeded(%v) returned %v, want %v;(-want,+got)\n%s", - tc.desc, tc.t, got, tc.want, diff) + t.Errorf("%s; ListLeaseExpired(%v) returned %v, want %v;(-want,+got)\n%s", + tc.desc, tc.cutoff, got, tc.want, diff) } } }