2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ArchiveAllPending

This commit is contained in:
Ken Hibino 2021-02-27 15:12:10 -08:00
parent e5ba008619
commit 043e61b06e
2 changed files with 145 additions and 10 deletions

View File

@ -617,35 +617,38 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
} }
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:archived // KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp // ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago) // ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100) // ARGV[3] -> max number of tasks in archive (e.g., 100)
var archiveAllPendingCmd = redis.NewScript(` var archiveAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1) local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], msg) redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end end
redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[1])
return table.getn(msgs)`) return table.getn(ids)`)
// ArchiveAllPendingTasks archives all pending tasks from the given queue and // ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks that were moved. // returns the number of tasks moved.
func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)}
now := time.Now() now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago argv := []interface{}{
args := []interface{}{now.Unix(), limit, maxArchiveSize} now.Unix(),
res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result() now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
n, ok := res.(int64) n, ok := res.(int64)
if !ok { if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res) return 0, fmt.Errorf("command error: unexpected return value %v", res)
} }
return n, nil return n, nil
} }

View File

@ -1966,6 +1966,138 @@ func TestArchivePendingTask(t *testing.T) {
} }
} }
} }
func TestArchiveAllPendingTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute)
t2 := time.Now().Add(1 * time.Hour)
tests := []struct {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
want int64
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
archived: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
},
archived: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
want: 1,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {},
},
archived: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
wantArchived: map[string][]base.Z{
"default": {},
"custom": {
{Message: m3, Score: time.Now().Unix()},
{Message: m4, Score: time.Now().Unix()},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got, err := r.ArchiveAllPendingTasks(tc.qname)
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil",
tc.qname, got, err, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
gotDead := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff)
}
}
}
}
func TestArchiveAllRetryTasks(t *testing.T) { func TestArchiveAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()