mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Fix scheduler
* Delete scheduler history data when scheduler stops * Fix history trimming bug
This commit is contained in:
parent
c9a6ab8ae1
commit
6529a1e0b1
@ -3067,12 +3067,6 @@ func TestSchedulerEnqueueEvents(t *testing.T) {
|
|||||||
oneHourAgo = now.Add(-1 * time.Hour)
|
oneHourAgo = now.Add(-1 * time.Hour)
|
||||||
)
|
)
|
||||||
|
|
||||||
type event struct {
|
|
||||||
entryID string
|
|
||||||
taskID string
|
|
||||||
enqueuedAt time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
entryID string
|
entryID string
|
||||||
events []*base.SchedulerEnqueueEvent
|
events []*base.SchedulerEnqueueEvent
|
||||||
@ -3121,6 +3115,53 @@ loop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRecordSchedulerEnqueueEventTrimsDataSet(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
var (
|
||||||
|
entryID = "entry123"
|
||||||
|
now = time.Now()
|
||||||
|
key = base.SchedulerHistoryKey(entryID)
|
||||||
|
)
|
||||||
|
|
||||||
|
// Record maximum number of events.
|
||||||
|
for i := 1; i <= maxEvents; i++ {
|
||||||
|
event := base.SchedulerEnqueueEvent{
|
||||||
|
TaskID: fmt.Sprintf("task%d", i),
|
||||||
|
EnqueuedAt: now.Add(-time.Duration(i) * time.Second),
|
||||||
|
}
|
||||||
|
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
|
||||||
|
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure the set is full.
|
||||||
|
if n := r.client.ZCard(key).Val(); n != maxEvents {
|
||||||
|
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record one more event, should evict the oldest event.
|
||||||
|
event := base.SchedulerEnqueueEvent{
|
||||||
|
TaskID: "latest",
|
||||||
|
EnqueuedAt: now,
|
||||||
|
}
|
||||||
|
if err := r.RecordSchedulerEnqueueEvent(entryID, &event); err != nil {
|
||||||
|
t.Fatalf("RecordSchedulerEnqueueEvent failed: %v", err)
|
||||||
|
}
|
||||||
|
if n := r.client.ZCard(key).Val(); n != maxEvents {
|
||||||
|
t.Fatalf("unexpected number of events; got %d, want %d", n, maxEvents)
|
||||||
|
}
|
||||||
|
events, err := r.ListSchedulerEnqueueEvents(entryID, Pagination{Size: maxEvents})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListSchedulerEnqueueEvents failed: %v", err)
|
||||||
|
}
|
||||||
|
if first := events[0]; first.TaskID != "latest" {
|
||||||
|
t.Errorf("unexpected first event; got %q, want %q", first.TaskID, "latest")
|
||||||
|
}
|
||||||
|
if last := events[maxEvents-1]; last.TaskID != fmt.Sprintf("task%d", maxEvents-1) {
|
||||||
|
t.Errorf("unexpected last event; got %q, want %q", last.TaskID, fmt.Sprintf("task%d", maxEvents-1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPause(t *testing.T) {
|
func TestPause(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
|
@ -634,12 +634,12 @@ func (r *RDB) PublishCancelation(id string) error {
|
|||||||
// ARGV[2] -> serialized SchedulerEnqueueEvent data
|
// ARGV[2] -> serialized SchedulerEnqueueEvent data
|
||||||
// ARGV[3] -> max number of events to be persisted
|
// ARGV[3] -> max number of events to be persisted
|
||||||
var recordSchedulerEnqueueEventCmd = redis.NewScript(`
|
var recordSchedulerEnqueueEventCmd = redis.NewScript(`
|
||||||
|
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3])
|
||||||
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
|
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
|
||||||
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3])
|
|
||||||
return redis.status_reply("OK")`)
|
return redis.status_reply("OK")`)
|
||||||
|
|
||||||
// Maximum number of enqueue events to store per entry.
|
// Maximum number of enqueue events to store per entry.
|
||||||
const maxEvents = 10000
|
const maxEvents = 1000
|
||||||
|
|
||||||
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
|
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
|
||||||
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
|
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
|
||||||
@ -651,3 +651,9 @@ func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerE
|
|||||||
return recordSchedulerEnqueueEventCmd.Run(
|
return recordSchedulerEnqueueEventCmd.Run(
|
||||||
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
|
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
|
||||||
|
func (r *RDB) ClearSchedulerHistory(entryID string) error {
|
||||||
|
key := base.SchedulerHistoryKey(entryID)
|
||||||
|
return r.client.Del(key).Err()
|
||||||
|
}
|
||||||
|
10
scheduler.go
10
scheduler.go
@ -186,6 +186,7 @@ func (s *Scheduler) Stop() error {
|
|||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
|
||||||
|
s.clearHistory()
|
||||||
s.client.Close()
|
s.client.Close()
|
||||||
s.rdb.Close()
|
s.rdb.Close()
|
||||||
s.status.Set(base.StatusStopped)
|
s.status.Set(base.StatusStopped)
|
||||||
@ -237,3 +238,12 @@ func stringifyOptions(opts []Option) []string {
|
|||||||
}
|
}
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) clearHistory() {
|
||||||
|
for _, entry := range s.cron.Entries() {
|
||||||
|
job := entry.Job.(*enqueueJob)
|
||||||
|
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
|
||||||
|
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user