From 4492ed9255ff3576bb8b1974b0b506645cca022b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 17 May 2020 12:33:55 -0700 Subject: [PATCH] Change internal constructor signatures. Created "params" type to avoid positional arguments. Personally it feels more explicit and reads better. --- heartbeat.go | 17 ++++++++++++----- heartbeat_test.go | 14 ++++++++++++-- processor.go | 4 ++-- processor_test.go | 8 ++++---- scheduler.go | 17 ++++++++++++----- scheduler_test.go | 7 ++++++- server.go | 29 ++++++++++++++++++++++++----- subscriber.go | 14 ++++++++++---- subscriber_test.go | 12 ++++++++++-- syncer.go | 14 ++++++++++---- syncer_test.go | 12 ++++++++++-- 11 files changed, 112 insertions(+), 36 deletions(-) diff --git a/heartbeat.go b/heartbeat.go index 754fb31..61b7706 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -27,13 +27,20 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { +type heartbeaterParams struct { + logger *log.Logger + broker base.Broker + serverState *base.ServerState + interval time.Duration +} + +func newHeartbeater(params heartbeaterParams) *heartbeater { return &heartbeater{ - logger: l, - broker: b, - ss: ss, + logger: params.logger, + broker: params.broker, + ss: params.serverState, done: make(chan struct{}), - interval: interval, + interval: params.interval, } } diff --git a/heartbeat_test.go b/heartbeat_test.go index cf17fe5..92ac9d0 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -38,7 +38,12 @@ func TestHeartbeater(t *testing.T) { h.FlushDB(t, r) state := base.NewServerState(tc.host, tc.pid, tc.concurrency, tc.queues, false) - hb := newHeartbeater(testLogger, rdbClient, state, tc.interval) + hb := newHeartbeater(heartbeaterParams{ + logger: testLogger, + broker: rdbClient, + serverState: state, + interval: tc.interval, + }) var wg sync.WaitGroup hb.start(&wg) @@ -115,7 +120,12 @@ func TestHeartbeaterWithRedisDown(t *testing.T) { r := rdb.NewRDB(setup(t)) testBroker := testbroker.NewTestBroker(r) ss := base.NewServerState("localhost", 1234, 10, map[string]int{"default": 1}, false) - hb := newHeartbeater(testLogger, testBroker, ss, time.Second) + hb := newHeartbeater(heartbeaterParams{ + logger: testLogger, + broker: testBroker, + serverState: ss, + interval: time.Second, + }) testBroker.Sleep() var wg sync.WaitGroup diff --git a/processor.go b/processor.go index 2840287..803c269 100644 --- a/processor.go +++ b/processor.go @@ -64,7 +64,7 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration -type newProcessorParams struct { +type processorParams struct { logger *log.Logger broker base.Broker ss *base.ServerState @@ -76,7 +76,7 @@ type newProcessorParams struct { } // newProcessor constructs a new processor. -func newProcessor(params newProcessorParams) *processor { +func newProcessor(params processorParams) *processor { info := params.ss.GetInfo() qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) diff --git a/processor_test.go b/processor_test.go index 87ac520..98c4cd4 100644 --- a/processor_test.go +++ b/processor_test.go @@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) { return nil } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) - p := newProcessor(newProcessorParams{ + p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, ss: ss, @@ -170,7 +170,7 @@ func TestProcessorRetry(t *testing.T) { n++ } ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) - p := newProcessor(newProcessorParams{ + p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, ss: ss, @@ -243,7 +243,7 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) - p := newProcessor(newProcessorParams{ + p := newProcessor(processorParams{ logger: testLogger, broker: nil, ss: ss, @@ -319,7 +319,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) - p := newProcessor(newProcessorParams{ + p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, ss: ss, diff --git a/scheduler.go b/scheduler.go index 8360135..21dea52 100644 --- a/scheduler.go +++ b/scheduler.go @@ -26,16 +26,23 @@ type scheduler struct { qnames []string } -func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { +type schedulerParams struct { + logger *log.Logger + broker base.Broker + interval time.Duration + queues map[string]int +} + +func newScheduler(params schedulerParams) *scheduler { var qnames []string - for q := range qcfg { + for q := range params.queues { qnames = append(qnames, q) } return &scheduler{ - logger: l, - broker: b, + logger: params.logger, + broker: params.broker, done: make(chan struct{}), - avgInterval: avgInterval, + avgInterval: params.interval, qnames: qnames, } } diff --git a/scheduler_test.go b/scheduler_test.go index e63e13b..ee82250 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -19,7 +19,12 @@ func TestScheduler(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - s := newScheduler(testLogger, rdbClient, pollInterval, defaultQueueConfig) + s := newScheduler(schedulerParams{ + logger: testLogger, + broker: rdbClient, + interval: pollInterval, + queues: defaultQueueConfig, + }) t1 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("send_email", nil) t3 := h.NewTaskMessage("reindex", nil) diff --git a/server.go b/server.go index 5177458..e271dfb 100644 --- a/server.go +++ b/server.go @@ -293,11 +293,30 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() - syncer := newSyncer(logger, syncCh, 5*time.Second) - heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second) - scheduler := newScheduler(logger, rdb, 5*time.Second, queues) - subscriber := newSubscriber(logger, rdb, cancels) - processor := newProcessor(newProcessorParams{ + + syncer := newSyncer(syncerParams{ + logger: logger, + requestsCh: syncCh, + interval: 5 * time.Second, + }) + heartbeater := newHeartbeater(heartbeaterParams{ + logger: logger, + broker: rdb, + serverState: ss, + interval: 5 * time.Second, + }) + scheduler := newScheduler(schedulerParams{ + logger: logger, + broker: rdb, + interval: 5 * time.Second, + queues: queues, + }) + subscriber := newSubscriber(subscriberParams{ + logger: logger, + broker: rdb, + cancelations: cancels, + }) + processor := newProcessor(processorParams{ logger: logger, broker: rdb, ss: ss, diff --git a/subscriber.go b/subscriber.go index 2804bfa..e9edd8d 100644 --- a/subscriber.go +++ b/subscriber.go @@ -27,12 +27,18 @@ type subscriber struct { retryTimeout time.Duration } -func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { +type subscriberParams struct { + logger *log.Logger + broker base.Broker + cancelations *base.Cancelations +} + +func newSubscriber(params subscriberParams) *subscriber { return &subscriber{ - logger: l, - broker: b, + logger: params.logger, + broker: params.broker, done: make(chan struct{}), - cancelations: cancelations, + cancelations: params.cancelations, retryTimeout: 5 * time.Second, } } diff --git a/subscriber_test.go b/subscriber_test.go index cc24986..6c84dd7 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -38,7 +38,11 @@ func TestSubscriber(t *testing.T) { cancelations := base.NewCancelations() cancelations.Add(tc.registeredID, fakeCancelFunc) - subscriber := newSubscriber(testLogger, rdbClient, cancelations) + subscriber := newSubscriber(subscriberParams{ + logger: testLogger, + broker: rdbClient, + cancelations: cancelations, + }) var wg sync.WaitGroup subscriber.start(&wg) defer subscriber.terminate() @@ -75,7 +79,11 @@ func TestSubscriberWithRedisDown(t *testing.T) { testBroker := testbroker.NewTestBroker(r) cancelations := base.NewCancelations() - subscriber := newSubscriber(testLogger, testBroker, cancelations) + subscriber := newSubscriber(subscriberParams{ + logger: testLogger, + broker: testBroker, + cancelations: cancelations, + }) subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose. testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis. diff --git a/syncer.go b/syncer.go index 9ea24fc..c5ca396 100644 --- a/syncer.go +++ b/syncer.go @@ -30,12 +30,18 @@ type syncRequest struct { errMsg string // error message } -func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { +type syncerParams struct { + logger *log.Logger + requestsCh <-chan *syncRequest + interval time.Duration +} + +func newSyncer(params syncerParams) *syncer { return &syncer{ - logger: l, - requestsCh: requestsCh, + logger: params.logger, + requestsCh: params.requestsCh, done: make(chan struct{}), - interval: interval, + interval: params.interval, } } diff --git a/syncer_test.go b/syncer_test.go index e069074..85f68cb 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -27,7 +27,11 @@ func TestSyncer(t *testing.T) { const interval = time.Second syncRequestCh := make(chan *syncRequest) - syncer := newSyncer(testLogger, syncRequestCh, interval) + syncer := newSyncer(syncerParams{ + logger: testLogger, + requestsCh: syncRequestCh, + interval: interval, + }) var wg sync.WaitGroup syncer.start(&wg) defer syncer.terminate() @@ -52,7 +56,11 @@ func TestSyncer(t *testing.T) { func TestSyncerRetry(t *testing.T) { const interval = time.Second syncRequestCh := make(chan *syncRequest) - syncer := newSyncer(testLogger, syncRequestCh, interval) + syncer := newSyncer(syncerParams{ + logger: testLogger, + requestsCh: syncRequestCh, + interval: interval, + }) var wg sync.WaitGroup syncer.start(&wg)