diff --git a/scheduler.go b/scheduler.go index 16fd5cc..6effb90 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,6 +19,9 @@ type scheduler struct { // channel to communicate back to the long running "scheduler" goroutine. done chan struct{} + // list of queue names to check and enqueue. + queues []string + // poll interval on average avgInterval time.Duration } @@ -26,6 +29,7 @@ type scheduler struct { type schedulerParams struct { logger *log.Logger broker base.Broker + queues []string interval time.Duration } @@ -34,6 +38,7 @@ func newScheduler(params schedulerParams) *scheduler { logger: params.logger, broker: params.broker, done: make(chan struct{}), + queues: params.queues, avgInterval: params.interval, } } @@ -62,7 +67,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) { } func (s *scheduler) exec() { - if err := s.broker.CheckAndEnqueue(); err != nil { + if err := s.broker.CheckAndEnqueue(s.queues...); err != nil { s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } diff --git a/scheduler_test.go b/scheduler_test.go index 0d522b5..bd56752 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -22,76 +22,115 @@ func TestScheduler(t *testing.T) { s := newScheduler(schedulerParams{ logger: testLogger, broker: rdbClient, + queues: []string{"default", "critical"}, interval: pollInterval, }) - t1 := h.NewTaskMessage("gen_thumbnail", nil) - t2 := h.NewTaskMessage("send_email", nil) - t3 := h.NewTaskMessage("reindex", nil) - t4 := h.NewTaskMessage("sync", nil) + t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default") + t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical") + t3 := h.NewTaskMessageWithQueue("reindex", nil, "default") + t4 := h.NewTaskMessageWithQueue("sync", nil, "critical") now := time.Now() tests := []struct { - initScheduled []base.Z // scheduled queue initial state - initRetry []base.Z // retry queue initial state - initQueue []*base.TaskMessage // default queue initial state - wait time.Duration // wait duration before checking for final state - wantScheduled []*base.TaskMessage // schedule queue final state - wantRetry []*base.TaskMessage // retry queue final state - wantQueue []*base.TaskMessage // default queue final state + initScheduled map[string][]base.Z // scheduled queue initial state + initRetry map[string][]base.Z // retry queue initial state + initEnqueued map[string][]*base.TaskMessage // default queue initial state + wait time.Duration // wait duration before checking for final state + wantScheduled map[string][]*base.TaskMessage // schedule queue final state + wantRetry map[string][]*base.TaskMessage // retry queue final state + wantEnqueued map[string][]*base.TaskMessage // default queue final state }{ { - initScheduled: []base.Z{ - {Message: t1, Score: now.Add(time.Hour).Unix()}, - {Message: t2, Score: now.Add(-2 * time.Second).Unix()}, + initScheduled: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(time.Hour).Unix()}}, + "critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}}, }, - initRetry: []base.Z{ - {Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}, + initRetry: map[string][]base.Z{ + "default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}}, + "critical": {}, + }, + initEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t4}, + }, + wait: pollInterval * 2, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t3}, + "critical": {t2, t4}, }, - initQueue: []*base.TaskMessage{t4}, - wait: pollInterval * 2, - wantScheduled: []*base.TaskMessage{t1}, - wantRetry: []*base.TaskMessage{}, - wantQueue: []*base.TaskMessage{t2, t3, t4}, }, { - initScheduled: []base.Z{ - {Message: t1, Score: now.Unix()}, - {Message: t2, Score: now.Add(-2 * time.Second).Unix()}, - {Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, + initScheduled: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Unix()}, + {Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, + }, + "critical": { + {Message: t2, Score: now.Add(-2 * time.Second).Unix()}, + }, + }, + initRetry: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + initEnqueued: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t4}, + }, + wait: pollInterval * 2, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": {t1, t3}, + "critical": {t2, t4}, }, - initRetry: []base.Z{}, - initQueue: []*base.TaskMessage{t4}, - wait: pollInterval * 2, - wantScheduled: []*base.TaskMessage{}, - wantRetry: []*base.TaskMessage{}, - wantQueue: []*base.TaskMessage{t1, t2, t3, t4}, }, } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue - h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue - h.SeedEnqueuedQueue(t, r, tc.initQueue) // initialize default queue + h.FlushDB(t, r) // clean up db before each test case. + h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue + h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue + h.SeedAllEnqueuedQueues(t, r, tc.initEnqueued) // initialize default queue var wg sync.WaitGroup s.start(&wg) time.Sleep(tc.wait) s.terminate() - gotScheduled := h.GetScheduledMessages(t, r) - if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledQueue, diff) + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } } - gotRetry := h.GetRetryMessages(t, r) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryQueue, diff) + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryKey(qname), diff) + } } - gotEnqueued := h.GetEnqueuedMessages(t, r) - if diff := cmp.Diff(tc.wantQueue, gotEnqueued, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultQueue, diff) + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r, qname) + if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultKey(qname), diff) + } } } } diff --git a/server.go b/server.go index c86e88f..6c634f6 100644 --- a/server.go +++ b/server.go @@ -286,6 +286,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if len(queues) == 0 { queues = defaultQueueConfig } + var qnames []string + for q, _ := range queues { + qnames = append(qnames, q) + } shutdownTimeout := cfg.ShutdownTimeout if shutdownTimeout == 0 { shutdownTimeout = defaultShutdownTimeout @@ -327,6 +331,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { scheduler := newScheduler(schedulerParams{ logger: logger, broker: rdb, + queues: qnames, interval: 5 * time.Second, }) subscriber := newSubscriber(subscriberParams{