2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Fix history trimming bug

This commit is contained in:
Ken Hibino 2021-01-24 22:43:37 -08:00
parent f40d60fe09
commit 9e723edd63
2 changed files with 48 additions and 7 deletions

View File

@ -3067,12 +3067,6 @@ func TestSchedulerEnqueueEvents(t *testing.T) {
oneHourAgo = now.Add(-1 * time.Hour) oneHourAgo = now.Add(-1 * time.Hour)
) )
type event struct {
entryID string
taskID string
enqueuedAt time.Time
}
tests := []struct { tests := []struct {
entryID string entryID string
events []*base.SchedulerEnqueueEvent events []*base.SchedulerEnqueueEvent
@ -3121,6 +3115,53 @@ loop:
} }
} }
func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
r := setup(t)
var (
entryID = "entry123"
now = time.Now()
key = base.SchedulerHistoryKey(entryID)
)
// Record maximum number of events.
for i := 1; i <= maxEvents; i++ {
event := base.SchedulerEnqueueEvent{
TaskID: fmt.Sprintf("task%d", i),
EnqueuedAt: now.Add(-time.Duration(i) * time.Second),
}
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
}
}
// Make sure the set is full.
if n := r.client.ZCard(key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
// Record one more event, should evict the oldest event.
event := base.SchedulerEnqueueEvent{
TaskID: "latest",
EnqueuedAt: now,
}
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
}
if n := r.client.ZCard(key).Val(); n != maxEvents {
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
}
events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents})
if err != nil {
t.Fatalf("ListSchedulerEnqueueEvents failed: %v", err)
}
if first := events[0]; first.TaskID != "latest" {
t.Errorf("unexpected first event; got %q, want %q", first.TaskID, "latest")
}
if last := events[maxEvents-1]; last.TaskID != fmt.Sprintf("task%d", maxEvents-1) {
t.Errorf("unexpected last event; got %q, want %q", last.TaskID, fmt.Sprintf("task%d", maxEvents-1))
}
}
func TestPause(t *testing.T) { func TestPause(t *testing.T) {
r := setup(t) r := setup(t)

View File

@ -634,8 +634,8 @@ func (r *RDB) PublishCancelation(id string) error {
// ARGV[2] -> serialized SchedulerEnqueueEvent data // ARGV[2] -> serialized SchedulerEnqueueEvent data
// ARGV[3] -> max number of events to be persisted // ARGV[3] -> max number of events to be persisted
var recordSchedulerEnqueueEventCmd = redis.NewScript(` var recordSchedulerEnqueueEventCmd = redis.NewScript(`
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3])
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3])
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Maximum number of enqueue events to store per entry. // Maximum number of enqueue events to store per entry.