2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Delete scheduler history data when scheduler stops

This commit is contained in:
Ken Hibino 2021-01-21 21:23:17 -08:00
parent 179b0120a5
commit f40d60fe09
2 changed files with 17 additions and 1 deletions

View File

@ -639,7 +639,7 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3])
return redis.status_reply("OK")`) return redis.status_reply("OK")`)
// Maximum number of enqueue events to store per entry. // Maximum number of enqueue events to store per entry.
const maxEvents = 10000 const maxEvents = 1000
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued. // RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
@ -651,3 +651,9 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE
return recordSchedulerEnqueueEventCmd.Run( return recordSchedulerEnqueueEventCmd.Run(
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err() r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
} }
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
func (r *RDB) ClearSchedulerHistory(entryID string) error {
key := base.SchedulerHistoryKey(entryID)
return r.client.Del(key).Err()
}

View File

@ -186,6 +186,7 @@ func (s *Scheduler) Stop() error {
<-ctx.Done() <-ctx.Done()
s.wg.Wait() s.wg.Wait()
s.clearHistory()
s.client.Close() s.client.Close()
s.rdb.Close() s.rdb.Close()
s.status.Set(base.StatusStopped) s.status.Set(base.StatusStopped)
@ -237,3 +238,12 @@ func stringifyOptions(opts []Option) []string {
} }
return res return res
} }
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
}
}
}