diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 9c514b6..28df926 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -639,7 +639,7 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3]) return redis.status_reply("OK")`) // Maximum number of enqueue events to store per entry. -const maxEvents = 10000 +const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { @@ -651,3 +651,9 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE return recordSchedulerEnqueueEventCmd.Run( r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err() } + +// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry. +func (r *RDB) ClearSchedulerHistory(entryID string) error { + key := base.SchedulerHistoryKey(entryID) + return r.client.Del(key).Err() +} diff --git a/scheduler.go b/scheduler.go index 78d733b..aa23586 100644 --- a/scheduler.go +++ b/scheduler.go @@ -186,6 +186,7 @@ func (s *Scheduler) Stop() error { <-ctx.Done() s.wg.Wait() + s.clearHistory() s.client.Close() s.rdb.Close() s.status.Set(base.StatusStopped) @@ -237,3 +238,12 @@ func stringifyOptions(opts []Option) []string { } return res } + +func (s *Scheduler) clearHistory() { + for _, entry := range s.cron.Entries() { + job := entry.Job.(*enqueueJob) + if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + } + } +}