2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-07-04 12:23:39 +08:00

Update RDB.ArchivePendingTask

This commit is contained in:
Ken Hibino 2021-02-26 21:40:55 -08:00
parent aecdfeaeee
commit 420bd2c748
2 changed files with 135 additions and 45 deletions

View File

@ -562,13 +562,12 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error {
// KEYS[1] -> asynq:{<qname>} // KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived // KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> task message to archive // ARGV[1] -> ID of the task to archive
// ARGV[2] -> current timestamp // ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago) // ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archive (e.g., 100) // ARGV[4] -> max number of tasks in archive (e.g., 100)
var archivePendingCmd = redis.NewScript(` var archivePendingCmd = redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 1, ARGV[1]) if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
if x == 0 then
return 0 return 0
end end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
@ -577,47 +576,33 @@ redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1 return 1
`) `)
func (r *RDB) archivePending(qname, msg string) (int64, error) { // ArchivePendingTask finds a pending task that matches the given id
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} // from the given queue and archives it.
now := time.Now() // If there's no match, it returns ErrTaskNotFound.
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
res, err := archivePendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error { func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.PendingKey(qname) keys := []string{
data, err := r.client.LRange(qkey, 0, -1).Result() base.PendingKey(qname),
base.ArchivedKey(qname),
}
now := time.Now()
argv := []interface{}{
id.String(),
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archivePendingCmd.Run(r.client, keys, argv...).Result()
if err != nil { if err != nil {
return err return err
} }
for _, s := range data { n, ok := res.(int64)
msg, err := base.DecodeMessage(s) if !ok {
if err != nil { return fmt.Errorf("command error: unexpected return value %v", res)
return err
}
if msg.ID == id {
n, err := r.archivePending(qname, s)
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
} }
return ErrTaskNotFound if n == 0 {
return ErrTaskNotFound
}
return nil
} }
// ArchiveAllRetryTasks archives all retry tasks from the given queue and // ArchiveAllRetryTasks archives all retry tasks from the given queue and

View File

@ -1716,7 +1716,7 @@ func TestArchiveRetryTask(t *testing.T) {
got := r.ArchiveRetryTask(tc.qname, tc.id) got := r.ArchiveRetryTask(tc.qname, tc.id)
if got != tc.want { if got != tc.want {
t.Errorf("(*RDB).KillRetryTask(%q, %v) = %v, want %v", t.Errorf("(*RDB).ArchiveRetryTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want) tc.qname, tc.id, got, tc.want)
continue continue
} }
@ -1838,7 +1838,7 @@ func TestArchiveScheduledTask(t *testing.T) {
got := r.ArchiveScheduledTask(tc.qname, tc.id) got := r.ArchiveScheduledTask(tc.qname, tc.id)
if got != tc.want { if got != tc.want {
t.Errorf("(*RDB).KillScheduledTask(%q, %v) = %v, want %v", t.Errorf("(*RDB).ArchiveScheduledTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want) tc.qname, tc.id, got, tc.want)
continue continue
} }
@ -1861,7 +1861,112 @@ func TestArchiveScheduledTask(t *testing.T) {
} }
} }
func TestKillAllRetryTasks(t *testing.T) { func TestArchivePendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
oneHourAgo := time.Now().Add(-1 * time.Hour)
tests := []struct {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
id uuid.UUID
want error
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
archived: map[string][]base.Z{
"default": {},
},
qname: "default",
id: m1.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m2},
},
wantArchived: map[string][]base.Z{
"default": {{Message: m1, Score: time.Now().Unix()}},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
},
archived: map[string][]base.Z{
"default": {{Message: m2, Score: oneHourAgo.Unix()}},
},
qname: "default",
id: m2.ID,
want: ErrTaskNotFound,
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
},
wantArchived: map[string][]base.Z{
"default": {{Message: m2, Score: oneHourAgo.Unix()}},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
id: m3.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m4},
},
wantArchived: map[string][]base.Z{
"default": {},
"custom": {{Message: m3, Score: time.Now().Unix()}},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchivePendingTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).ArchivePendingTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
gotDead := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff)
}
}
}
}
func TestArchiveAllRetryTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -2008,7 +2113,7 @@ func TestKillAllRetryTasks(t *testing.T) {
} }
} }
func TestKillAllScheduledTasks(t *testing.T) { func TestArchiveAllScheduledTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -2155,7 +2260,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
} }
} }
func TestDeleteDeadTask(t *testing.T) { func TestDeleteArchivedTask(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
@ -2425,7 +2530,7 @@ func TestDeleteScheduledTask(t *testing.T) {
} }
} }
func TestDeleteAllDeadTasks(t *testing.T) { func TestDeleteAllArchivedTasks(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)