From 6529a1e0b1289d01f13229f450a5a0904e162a2c Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 25 Jan 2021 22:32:37 -0800 Subject: [PATCH] Fix scheduler * Delete scheduler history data when scheduler stops * Fix history trimming bug --- internal/rdb/inspect_test.go | 53 ++++++++++++++++++++++++++++++++---- internal/rdb/rdb.go | 10 +++++-- scheduler.go | 10 +++++++ 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index d838423..f4313d5 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3067,12 +3067,6 @@ func TestSchedulerEnqueueEvents(t *testing.T) { oneHourAgo = now.Add(-1 * time.Hour) ) - type event struct { - entryID string - taskID string - enqueuedAt time.Time - } - tests := []struct { entryID string events []*base.SchedulerEnqueueEvent @@ -3121,6 +3115,53 @@ loop: } } +func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) { + r := setup(t) + var ( + entryID = "entry123" + now = time.Now() + key = base.SchedulerHistoryKey(entryID) + ) + + // Record maximum number of events. + for i := 1; i <= maxEvents; i++ { + event := base.SchedulerEnqueueEvent{ + TaskID: fmt.Sprintf("task%d", i), + EnqueuedAt: now.Add(-time.Duration(i) * time.Second), + } + if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil { + t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err) + } + } + + // Make sure the set is full. + if n := r.client.ZCard(key).Val(); n != maxEvents { + t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents) + } + + // Record one more event, should evict the oldest event. + event := base.SchedulerEnqueueEvent{ + TaskID: "latest", + EnqueuedAt: now, + } + if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil { + t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err) + } + if n := r.client.ZCard(key).Val(); n != maxEvents { + t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents) + } + events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents}) + if err != nil { + t.Fatalf("ListSchedulerEnqueueEvents failed: %v", err) + } + if first := events[0]; first.TaskID != "latest" { + t.Errorf("unexpected first event; got %q, want %q", first.TaskID, "latest") + } + if last := events[maxEvents-1]; last.TaskID != fmt.Sprintf("task%d", maxEvents-1) { + t.Errorf("unexpected last event; got %q, want %q", last.TaskID, fmt.Sprintf("task%d", maxEvents-1)) + } +} + func TestPause(t *testing.T) { r := setup(t) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 9c514b6..c557a71 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -634,12 +634,12 @@ func (r *RDB) PublishCancelation(id string) error { // ARGV[2] -> serialized SchedulerEnqueueEvent data // ARGV[3] -> max number of events to be persisted var recordSchedulerEnqueueEventCmd = redis.NewScript(` +redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3]) redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) -redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3]) return redis.status_reply("OK")`) // Maximum number of enqueue events to store per entry. -const maxEvents = 10000 +const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { @@ -651,3 +651,9 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE return recordSchedulerEnqueueEventCmd.Run( r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err() } + +// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry. +func (r *RDB) ClearSchedulerHistory(entryID string) error { + key := base.SchedulerHistoryKey(entryID) + return r.client.Del(key).Err() +} diff --git a/scheduler.go b/scheduler.go index 78d733b..aa23586 100644 --- a/scheduler.go +++ b/scheduler.go @@ -186,6 +186,7 @@ func (s *Scheduler) Stop() error { <-ctx.Done() s.wg.Wait() + s.clearHistory() s.client.Close() s.rdb.Close() s.status.Set(base.StatusStopped) @@ -237,3 +238,12 @@ func stringifyOptions(opts []Option) []string { } return res } + +func (s *Scheduler) clearHistory() { + for _, entry := range s.cron.Entries() { + job := entry.Job.(*enqueueJob) + if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + } + } +}