diff --git a/recoverer.go b/recoverer.go index 7fde8a4..c0dce5e 100644 --- a/recoverer.go +++ b/recoverer.go @@ -21,6 +21,9 @@ type recoverer struct { // channel to communicate back to the long running "recoverer" goroutine. done chan struct{} + // list of queues to check for deadline. + queues []string + // poll interval. interval time.Duration } @@ -28,6 +31,7 @@ type recoverer struct { type recovererParams struct { logger *log.Logger broker base.Broker + queues []string interval time.Duration retryDelayFunc retryDelayFunc } @@ -37,6 +41,7 @@ func newRecoverer(params recovererParams) *recoverer { logger: params.logger, broker: params.broker, done: make(chan struct{}), + queues: params.queues, interval: params.interval, retryDelayFunc: params.retryDelayFunc, } @@ -62,7 +67,7 @@ func (r *recoverer) start(wg *sync.WaitGroup) { case <-timer.C: // Get all tasks which have expired 30 seconds ago or earlier. deadline := time.Now().Add(-30 * time.Second) - msgs, err := r.broker.ListDeadlineExceeded(deadline) + msgs, err := r.broker.ListDeadlineExceeded(deadline, r.queues...) if err != nil { r.logger.Warn("recoverer: could not list deadline exceeded tasks") continue diff --git a/recoverer_test.go b/recoverer_test.go index 2f9e82f..c43f5f0 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -19,10 +19,10 @@ func TestRecoverer(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - t1 := h.NewTaskMessage("task1", nil) - t2 := h.NewTaskMessage("task2", nil) + t1 := h.NewTaskMessageWithQueue("task1", nil, "default") + t2 := h.NewTaskMessageWithQueue("task2", nil, "default") t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") - t4 := h.NewTaskMessage("task4", nil) + t4 := h.NewTaskMessageWithQueue("task4", nil, "default") t4.Retried = t4.Retry // t4 has reached its max retry count now := time.Now() @@ -33,106 +33,201 @@ func TestRecoverer(t *testing.T) { tests := []struct { desc string - inProgress []*base.TaskMessage - deadlines []base.Z - retry []base.Z - dead []base.Z - wantInProgress []*base.TaskMessage - wantDeadlines []base.Z - wantRetry []*base.TaskMessage - wantDead []*base.TaskMessage + inProgress map[string][]*base.TaskMessage + deadlines map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + wantInProgress map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantRetry map[string][]*base.TaskMessage + wantDead map[string][]*base.TaskMessage }{ { - desc: "with one task in-progress", - inProgress: []*base.TaskMessage{t1}, - deadlines: []base.Z{ - {Message: t1, Score: fiveMinutesAgo.Unix()}, + desc: "with one task in-progress", + inProgress: map[string][]*base.TaskMessage{ + "default": {t1}, }, - retry: []base.Z{}, - dead: []base.Z{}, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, - wantRetry: []*base.TaskMessage{ - h.TaskMessageAfterRetry(*t1, "deadline exceeded"), + deadlines: map[string][]base.Z{ + "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}}, + }, + retry: map[string][]base.Z{ + "default": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, + }, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + }, + }, + { + desc: "with a task with max-retry reached", + inProgress: map[string][]*base.TaskMessage{ + "default": {t4}, + "critical": {}, + }, + deadlines: map[string][]base.Z{ + "default": {{Message: t4, Score: fiveMinutesAgo.Unix()}}, + "critical": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantDead: map[string][]*base.TaskMessage{ + "default": {h.TaskMessageWithError(*t4, "deadline exceeded")}, + "critical": {}, + }, + }, + { + desc: "with multiple tasks in-progress, and one expired", + inProgress: map[string][]*base.TaskMessage{ + "default": {t1, t2}, + "critical": {t3}, + }, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: oneHourAgo.Unix()}, + {Message: t2, Score: fiveMinutesFromNow.Unix()}, + }, + "critical": { + {Message: t3, Score: oneHourFromNow.Unix()}, + }, + }, + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + "critical": {t3}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t2, Score: fiveMinutesFromNow.Unix()}}, + "critical": {{Message: t3, Score: oneHourFromNow.Unix()}}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, + "critical": {}, }, wantDead: []*base.TaskMessage{}, }, { - desc: "with a task with max-retry reached", - inProgress: []*base.TaskMessage{t4}, - deadlines: []base.Z{ - {Message: t4, Score: fiveMinutesAgo.Unix()}, + desc: "with multiple expired tasks in-progress", + inProgress: map[string][]*base.TaskMessage{ + "default": {t1, t2}, + "critical": {t3}, + }, + deadlines: map[string][]base.Z{ + "default": { + {Message: t1, Score: oneHourAgo.Unix()}, + {Message: t2, Score: oneHourFromNow.Unix()}, + }, + "critical": { + {Message: t3, Score: fiveMinutesAgo.Unix()}, + }, + }, + retry: map[string][]base.Z{ + "default": {}, + "cricial": {}, + }, + dead: map[string][]base.Z{ + "default": {}, + "cricial": {}, + }, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {t2}, + "critical": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {{Message: t2, Score: oneHourFromNow.Unix()}}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, + "critical": {h.TaskMessageAfterRetry(*t3, "deadline exceeded")}, + }, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, }, - retry: []base.Z{}, - dead: []base.Z{}, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, - wantRetry: []*base.TaskMessage{}, - wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")}, }, { - desc: "with multiple tasks in-progress, and one expired", - inProgress: []*base.TaskMessage{t1, t2, t3}, - deadlines: []base.Z{ - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesFromNow.Unix()}, - {Message: t3, Score: oneHourFromNow.Unix()}, + desc: "with empty in-progress queue", + inProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, }, - retry: []base.Z{}, - dead: []base.Z{}, - wantInProgress: []*base.TaskMessage{t2, t3}, - wantDeadlines: []base.Z{ - {Message: t2, Score: fiveMinutesFromNow.Unix()}, - {Message: t3, Score: oneHourFromNow.Unix()}, + deadlines: map[string][]base.Z{ + "default": {}, + "critical": {}, }, - wantRetry: []*base.TaskMessage{ - h.TaskMessageAfterRetry(*t1, "deadline exceeded"), + retry: map[string][]base.Z{ + "default": {}, + "critical": {}, }, - wantDead: []*base.TaskMessage{}, - }, - { - desc: "with multiple expired tasks in-progress", - inProgress: []*base.TaskMessage{t1, t2, t3}, - deadlines: []base.Z{ - {Message: t1, Score: oneHourAgo.Unix()}, - {Message: t2, Score: fiveMinutesAgo.Unix()}, - {Message: t3, Score: oneHourFromNow.Unix()}, + dead: map[string][]base.Z{ + "default": {}, + "critical": {}, }, - retry: []base.Z{}, - dead: []base.Z{}, - wantInProgress: []*base.TaskMessage{t3}, - wantDeadlines: []base.Z{ - {Message: t3, Score: oneHourFromNow.Unix()}, + wantInProgress: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, }, - wantRetry: []*base.TaskMessage{ - h.TaskMessageAfterRetry(*t1, "deadline exceeded"), - h.TaskMessageAfterRetry(*t2, "deadline exceeded"), + wantDeadlines: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantDead: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, }, - wantDead: []*base.TaskMessage{}, - }, - { - desc: "with empty in-progress queue", - inProgress: []*base.TaskMessage{}, - deadlines: []base.Z{}, - retry: []base.Z{}, - dead: []base.Z{}, - wantInProgress: []*base.TaskMessage{}, - wantDeadlines: []base.Z{}, - wantRetry: []*base.TaskMessage{}, - wantDead: []*base.TaskMessage{}, }, } for _, tc := range tests { h.FlushDB(t, r) - h.SeedInProgressQueue(t, r, tc.inProgress) - h.SeedDeadlines(t, r, tc.deadlines) - h.SeedRetryQueue(t, r, tc.retry) - h.SeedDeadQueue(t, r, tc.dead) + h.SeedAllInProgressQueues(t, r, tc.inProgress) + h.SeedAllDeadlines(t, r, tc.deadlines) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllDeadQueues(t, r, tc.dead) recoverer := newRecoverer(recovererParams{ logger: testLogger, broker: rdbClient, + queues: []string{"default", "critical"}, interval: 1 * time.Second, retryDelayFunc: func(n int, err error, task *Task) time.Duration { return 30 * time.Second }, }) @@ -142,21 +237,29 @@ func TestRecoverer(t *testing.T) { time.Sleep(2 * time.Second) recoverer.terminate() - gotInProgress := h.GetInProgressMessages(t, r) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.InProgressQueue, diff) + for qname, want := range tc.wantInProgress { + gotInProgress := h.GetInProgressMessages(t, r, qname) + if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.InProgressKey(qname), diff) + } } - gotDeadlines := h.GetDeadlinesEntries(t, r) - if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.KeyDeadlines, diff) + for qname, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r, qname) + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.DeadlinesKey(qname), diff) + } } - gotRetry := h.GetRetryMessages(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff) + } } - gotDead := h.GetDeadMessages(t, r) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.DeadQueue, diff) + for qname, want := range tc.wantDead { + gotDead := h.GetDeadMessages(t, r, qname) + if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff) + } } } } diff --git a/server.go b/server.go index 6c634f6..8fbef00 100644 --- a/server.go +++ b/server.go @@ -357,6 +357,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger: logger, broker: rdb, retryDelayFunc: delayFunc, + queues: qnames, interval: 1 * time.Minute, }) healthchecker := newHealthChecker(healthcheckerParams{