From 79c17b9ee9072115e06c8d0e5a2eeaa6063e7bad Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 17 Sep 2021 21:37:58 -0700 Subject: [PATCH] Add RDB.DeleteExpiredCompletedTasks --- internal/rdb/rdb.go | 51 +++++++++++++++++++++++ internal/rdb/rdb_test.go | 87 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 96ab044..9af1df9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -794,6 +794,57 @@ func (r *RDB) forwardAll(qname string) (err error) { return nil } +// KEYS[1] -> asynq:{}:completed +// ARGV[1] -> current time in unix time +// ARGV[2] -> task key prefix +// ARGV[3] -> batch size (i.e. maximum number of tasks to delete) +// +// Returns the number of tasks deleted. +var deleteExpiredCompletedTasksCmd = redis.NewScript(` +local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3])) +for _, id in ipairs(ids) do + redis.call("DEL", ARGV[2] .. id) + redis.call("ZREM", KEYS[1], id) +end +return table.getn(ids)`) + +// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, +// and delete all expired tasks. +func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { + // Note: Do this operation in fix batches to prevent long running script. + const batchSize = 100 + for { + n, err := r.deleteExpiredCompletedTasks(qname, batchSize) + if err != nil { + return err + } + if n == 0 { + return nil + } + } +} + +// deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified +// batch size. It reports the number of tasks deleted. +func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, error) { + var op errors.Op = "rdb.DeleteExpiredCompletedTasks" + keys := []string{base.CompletedKey(qname)} + argv := []interface{}{ + time.Now().Unix(), + base.TaskKeyPrefix(qname), + batchSize, + } + res, err := deleteExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err != nil { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + n, ok := res.(int64) + if !ok { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res)) + } + return n, nil +} + // KEYS[1] -> asynq:{}:deadlines // ARGV[1] -> deadline in unix time // ARGV[2] -> task key prefix diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 49d7d14..3ce1093 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -2082,6 +2082,93 @@ func TestForwardIfReady(t *testing.T) { } } +func newCompletedTask(qname, typename string, payload []byte, completedAt time.Time) *base.TaskMessage { + msg := h.NewTaskMessageWithQueue(typename, payload, qname) + msg.CompletedAt = completedAt.Unix() + return msg +} + +func TestDeleteExpiredCompletedTasks(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + secondAgo := now.Add(-time.Second) + hourFromNow := now.Add(time.Hour) + hourAgo := now.Add(-time.Hour) + minuteAgo := now.Add(-time.Minute) + + t1 := newCompletedTask("default", "task1", nil, hourAgo) + t2 := newCompletedTask("default", "task2", nil, minuteAgo) + t3 := newCompletedTask("default", "task3", nil, secondAgo) + t4 := newCompletedTask("critical", "critical_task", nil, hourAgo) + t5 := newCompletedTask("low", "low_priority_task", nil, hourAgo) + + tests := []struct { + desc string + completed map[string][]base.Z + qname string + wantCompleted map[string][]base.Z + }{ + { + desc: "deletes expired task from default queue", + completed: map[string][]base.Z{ + "default": { + {Message: t1, Score: secondAgo.Unix()}, + {Message: t2, Score: hourFromNow.Unix()}, + {Message: t3, Score: now.Unix()}, + }, + }, + qname: "default", + wantCompleted: map[string][]base.Z{ + "default": { + {Message: t2, Score: hourFromNow.Unix()}, + }, + }, + }, + { + desc: "deletes expired task from specified queue", + completed: map[string][]base.Z{ + "default": { + {Message: t2, Score: secondAgo.Unix()}, + }, + "critical": { + {Message: t4, Score: secondAgo.Unix()}, + }, + "low": { + {Message: t5, Score: now.Unix()}, + }, + }, + qname: "critical", + wantCompleted: map[string][]base.Z{ + "default": { + {Message: t2, Score: secondAgo.Unix()}, + }, + "critical": {}, + "low": { + {Message: t5, Score: now.Unix()}, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllCompletedQueues(t, r.client, tc.completed) + + if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil { + t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err) + continue + } + + for qname, want := range tc.wantCompleted { + got := h.GetCompletedEntries(t, r.client, qname) + if diff := cmp.Diff(want, got, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s: diff found in %q completed set: want=%v, got=%v\n%s", tc.desc, qname, want, got, diff) + } + } + } +} + func TestListDeadlineExceeded(t *testing.T) { t1 := h.NewTaskMessageWithQueue("task1", nil, "default") t2 := h.NewTaskMessageWithQueue("task2", nil, "default")