Add KillAllRetryTasks and KillAllScheduledTasks method to RDB

This commit is contained in:
Ken Hibino
2019-12-27 06:22:33 -08:00
parent aaa813dfdc
commit 8d3d30da8f
2 changed files with 234 additions and 0 deletions

View File

@@ -1014,6 +1014,198 @@ func TestKillScheduledTask(t *testing.T) {
}
}
func TestKillAllRetryTasks(t *testing.T) {
r := setup(t)
m1 := newTaskMessage("send_email", nil)
m2 := newTaskMessage("reindex", nil)
t1 := time.Now().Add(time.Minute)
t2 := time.Now().Add(time.Hour)
tests := []struct {
retry []sortedSetEntry
dead []sortedSetEntry
want int64
wantRetry []sortedSetEntry
wantDead []sortedSetEntry
}{
{
retry: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
dead: []sortedSetEntry{},
want: 2,
wantRetry: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, time.Now().Unix()},
{m2, time.Now().Unix()},
},
},
{
retry: []sortedSetEntry{
{m1, t1.Unix()},
},
dead: []sortedSetEntry{
{m2, t2.Unix()},
},
want: 1,
wantRetry: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, time.Now().Unix()},
{m2, t2.Unix()},
},
},
{
retry: []sortedSetEntry{},
dead: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
want: 0,
wantRetry: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
},
}
for _, tc := range tests {
flushDB(t, r)
seedRetryQueue(t, r, tc.retry)
seedDeadQueue(t, r, tc.dead)
got, err := r.KillAllRetryTasks()
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllRetryTasks() = %v, %v; want %v, nil",
got, err, tc.want)
continue
}
gotRetryRaw := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Val()
var gotRetry []sortedSetEntry
for _, z := range gotRetryRaw {
gotRetry = append(gotRetry, sortedSetEntry{
Msg: mustUnmarshal(t, z.Member.(string)),
Score: int64(z.Score),
})
}
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortZSetEntryOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.RetryQueue, diff)
}
gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val()
var gotDead []sortedSetEntry
for _, z := range gotDeadRaw {
gotDead = append(gotDead, sortedSetEntry{
Msg: mustUnmarshal(t, z.Member.(string)),
Score: int64(z.Score),
})
}
if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadQueue, diff)
}
}
}
func TestKillAllScheduledTasks(t *testing.T) {
r := setup(t)
m1 := newTaskMessage("send_email", nil)
m2 := newTaskMessage("reindex", nil)
t1 := time.Now().Add(time.Minute)
t2 := time.Now().Add(time.Hour)
tests := []struct {
scheduled []sortedSetEntry
dead []sortedSetEntry
want int64
wantScheduled []sortedSetEntry
wantDead []sortedSetEntry
}{
{
scheduled: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
dead: []sortedSetEntry{},
want: 2,
wantScheduled: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, time.Now().Unix()},
{m2, time.Now().Unix()},
},
},
{
scheduled: []sortedSetEntry{
{m1, t1.Unix()},
},
dead: []sortedSetEntry{
{m2, t2.Unix()},
},
want: 1,
wantScheduled: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, time.Now().Unix()},
{m2, t2.Unix()},
},
},
{
scheduled: []sortedSetEntry{},
dead: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
want: 0,
wantScheduled: []sortedSetEntry{},
wantDead: []sortedSetEntry{
{m1, t1.Unix()},
{m2, t2.Unix()},
},
},
}
for _, tc := range tests {
flushDB(t, r)
seedScheduledQueue(t, r, tc.scheduled)
seedDeadQueue(t, r, tc.dead)
got, err := r.KillAllScheduledTasks()
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllScheduledTasks() = %v, %v; want %v, nil",
got, err, tc.want)
continue
}
gotScheduledRaw := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val()
var gotScheduled []sortedSetEntry
for _, z := range gotScheduledRaw {
gotScheduled = append(gotScheduled, sortedSetEntry{
Msg: mustUnmarshal(t, z.Member.(string)),
Score: int64(z.Score),
})
}
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortZSetEntryOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ScheduledQueue, diff)
}
gotDeadRaw := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val()
var gotDead []sortedSetEntry
for _, z := range gotDeadRaw {
gotDead = append(gotDead, sortedSetEntry{
Msg: mustUnmarshal(t, z.Member.(string)),
Score: int64(z.Score),
})
}
if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt, timeCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.DeadQueue, diff)
}
}
}
func TestDeleteDeadTask(t *testing.T) {
r := setup(t)
m1 := newTaskMessage("send_email", nil)