2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

Add RDB.DeleteExpiredCompletedTasks

This commit is contained in:
Ken Hibino 2021-09-17 21:37:58 -07:00
parent 4c0bbe2998
commit 79c17b9ee9
2 changed files with 138 additions and 0 deletions

View File

@ -794,6 +794,57 @@ func (r *RDB) forwardAll(qname string) (err error) {
return nil return nil
} }
// KEYS[1] -> asynq:{<qname>}:completed
// ARGV[1] -> current time in unix time
// ARGV[2] -> task key prefix
// ARGV[3] -> batch size (i.e. maximum number of tasks to delete)
//
// Returns the number of tasks deleted.
var deleteExpiredCompletedTasksCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3]))
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[2] .. id)
redis.call("ZREM", KEYS[1], id)
end
return table.getn(ids)`)
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
// and delete all expired tasks.
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
// Note: Do this operation in fix batches to prevent long running script.
const batchSize = 100
for {
n, err := r.deleteExpiredCompletedTasks(qname, batchSize)
if err != nil {
return err
}
if n == 0 {
return nil
}
}
}
// deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified
// batch size. It reports the number of tasks deleted.
func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, error) {
var op errors.Op = "rdb.DeleteExpiredCompletedTasks"
keys := []string{base.CompletedKey(qname)}
argv := []interface{}{
time.Now().Unix(),
base.TaskKeyPrefix(qname),
batchSize,
}
res, err := deleteExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
}
n, ok := res.(int64)
if !ok {
return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res))
}
return n, nil
}
// KEYS[1] -> asynq:{<qname>}:deadlines // KEYS[1] -> asynq:{<qname>}:deadlines
// ARGV[1] -> deadline in unix time // ARGV[1] -> deadline in unix time
// ARGV[2] -> task key prefix // ARGV[2] -> task key prefix

View File

@ -2082,6 +2082,93 @@ func TestForwardIfReady(t *testing.T) {
} }
} }
func newCompletedTask(qname, typename string, payload []byte, completedAt time.Time) *base.TaskMessage {
msg := h.NewTaskMessageWithQueue(typename, payload, qname)
msg.CompletedAt = completedAt.Unix()
return msg
}
func TestDeleteExpiredCompletedTasks(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
secondAgo := now.Add(-time.Second)
hourFromNow := now.Add(time.Hour)
hourAgo := now.Add(-time.Hour)
minuteAgo := now.Add(-time.Minute)
t1 := newCompletedTask("default", "task1", nil, hourAgo)
t2 := newCompletedTask("default", "task2", nil, minuteAgo)
t3 := newCompletedTask("default", "task3", nil, secondAgo)
t4 := newCompletedTask("critical", "critical_task", nil, hourAgo)
t5 := newCompletedTask("low", "low_priority_task", nil, hourAgo)
tests := []struct {
desc string
completed map[string][]base.Z
qname string
wantCompleted map[string][]base.Z
}{
{
desc: "deletes expired task from default queue",
completed: map[string][]base.Z{
"default": {
{Message: t1, Score: secondAgo.Unix()},
{Message: t2, Score: hourFromNow.Unix()},
{Message: t3, Score: now.Unix()},
},
},
qname: "default",
wantCompleted: map[string][]base.Z{
"default": {
{Message: t2, Score: hourFromNow.Unix()},
},
},
},
{
desc: "deletes expired task from specified queue",
completed: map[string][]base.Z{
"default": {
{Message: t2, Score: secondAgo.Unix()},
},
"critical": {
{Message: t4, Score: secondAgo.Unix()},
},
"low": {
{Message: t5, Score: now.Unix()},
},
},
qname: "critical",
wantCompleted: map[string][]base.Z{
"default": {
{Message: t2, Score: secondAgo.Unix()},
},
"critical": {},
"low": {
{Message: t5, Score: now.Unix()},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
if err := r.DeleteExpiredCompletedTasks(tc.qname); err != nil {
t.Errorf("DeleteExpiredCompletedTasks(%q) failed: %v", tc.qname, err)
continue
}
for qname, want := range tc.wantCompleted {
got := h.GetCompletedEntries(t, r.client, qname)
if diff := cmp.Diff(want, got, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s: diff found in %q completed set: want=%v, got=%v\n%s", tc.desc, qname, want, got, diff)
}
}
}
}
func TestListDeadlineExceeded(t *testing.T) { func TestListDeadlineExceeded(t *testing.T) {
t1 := h.NewTaskMessageWithQueue("task1", nil, "default") t1 := h.NewTaskMessageWithQueue("task1", nil, "default")
t2 := h.NewTaskMessageWithQueue("task2", nil, "default") t2 := h.NewTaskMessageWithQueue("task2", nil, "default")