mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Change internal constructor signatures.
Created "params" type to avoid positional arguments. Personally it feels more explicit and reads better.
This commit is contained in:
parent
4e3e053989
commit
4492ed9255
17
heartbeat.go
17
heartbeat.go
@ -27,13 +27,20 @@ type heartbeater struct {
|
|||||||
interval time.Duration
|
interval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater {
|
type heartbeaterParams struct {
|
||||||
|
logger *log.Logger
|
||||||
|
broker base.Broker
|
||||||
|
serverState *base.ServerState
|
||||||
|
interval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func newHeartbeater(params heartbeaterParams) *heartbeater {
|
||||||
return &heartbeater{
|
return &heartbeater{
|
||||||
logger: l,
|
logger: params.logger,
|
||||||
broker: b,
|
broker: params.broker,
|
||||||
ss: ss,
|
ss: params.serverState,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
interval: interval,
|
interval: params.interval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +38,12 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
|
|
||||||
state := base.NewServerState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
state := base.NewServerState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
||||||
hb := newHeartbeater(testLogger, rdbClient, state, tc.interval)
|
hb := newHeartbeater(heartbeaterParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: rdbClient,
|
||||||
|
serverState: state,
|
||||||
|
interval: tc.interval,
|
||||||
|
})
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
hb.start(&wg)
|
hb.start(&wg)
|
||||||
@ -115,7 +120,12 @@ func TestHeartbeaterWithRedisDown(t *testing.T) {
|
|||||||
r := rdb.NewRDB(setup(t))
|
r := rdb.NewRDB(setup(t))
|
||||||
testBroker := testbroker.NewTestBroker(r)
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
ss := base.NewServerState("localhost", 1234, 10, map[string]int{"default": 1}, false)
|
ss := base.NewServerState("localhost", 1234, 10, map[string]int{"default": 1}, false)
|
||||||
hb := newHeartbeater(testLogger, testBroker, ss, time.Second)
|
hb := newHeartbeater(heartbeaterParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: testBroker,
|
||||||
|
serverState: ss,
|
||||||
|
interval: time.Second,
|
||||||
|
})
|
||||||
|
|
||||||
testBroker.Sleep()
|
testBroker.Sleep()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
@ -64,7 +64,7 @@ type processor struct {
|
|||||||
|
|
||||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||||
|
|
||||||
type newProcessorParams struct {
|
type processorParams struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
broker base.Broker
|
broker base.Broker
|
||||||
ss *base.ServerState
|
ss *base.ServerState
|
||||||
@ -76,7 +76,7 @@ type newProcessorParams struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newProcessor constructs a new processor.
|
// newProcessor constructs a new processor.
|
||||||
func newProcessor(params newProcessorParams) *processor {
|
func newProcessor(params processorParams) *processor {
|
||||||
info := params.ss.GetInfo()
|
info := params.ss.GetInfo()
|
||||||
qcfg := normalizeQueueCfg(info.Queues)
|
qcfg := normalizeQueueCfg(info.Queues)
|
||||||
orderedQueues := []string(nil)
|
orderedQueues := []string(nil)
|
||||||
|
@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
p := newProcessor(newProcessorParams{
|
p := newProcessor(processorParams{
|
||||||
logger: testLogger,
|
logger: testLogger,
|
||||||
broker: rdbClient,
|
broker: rdbClient,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
@ -170,7 +170,7 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
p := newProcessor(newProcessorParams{
|
p := newProcessor(processorParams{
|
||||||
logger: testLogger,
|
logger: testLogger,
|
||||||
broker: rdbClient,
|
broker: rdbClient,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
@ -243,7 +243,7 @@ func TestProcessorQueues(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
|
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
|
||||||
p := newProcessor(newProcessorParams{
|
p := newProcessor(processorParams{
|
||||||
logger: testLogger,
|
logger: testLogger,
|
||||||
broker: nil,
|
broker: nil,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
@ -319,7 +319,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
||||||
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
||||||
p := newProcessor(newProcessorParams{
|
p := newProcessor(processorParams{
|
||||||
logger: testLogger,
|
logger: testLogger,
|
||||||
broker: rdbClient,
|
broker: rdbClient,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
|
17
scheduler.go
17
scheduler.go
@ -26,16 +26,23 @@ type scheduler struct {
|
|||||||
qnames []string
|
qnames []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler {
|
type schedulerParams struct {
|
||||||
|
logger *log.Logger
|
||||||
|
broker base.Broker
|
||||||
|
interval time.Duration
|
||||||
|
queues map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScheduler(params schedulerParams) *scheduler {
|
||||||
var qnames []string
|
var qnames []string
|
||||||
for q := range qcfg {
|
for q := range params.queues {
|
||||||
qnames = append(qnames, q)
|
qnames = append(qnames, q)
|
||||||
}
|
}
|
||||||
return &scheduler{
|
return &scheduler{
|
||||||
logger: l,
|
logger: params.logger,
|
||||||
broker: b,
|
broker: params.broker,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
avgInterval: avgInterval,
|
avgInterval: params.interval,
|
||||||
qnames: qnames,
|
qnames: qnames,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,12 @@ func TestScheduler(t *testing.T) {
|
|||||||
r := setup(t)
|
r := setup(t)
|
||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
const pollInterval = time.Second
|
const pollInterval = time.Second
|
||||||
s := newScheduler(testLogger, rdbClient, pollInterval, defaultQueueConfig)
|
s := newScheduler(schedulerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: rdbClient,
|
||||||
|
interval: pollInterval,
|
||||||
|
queues: defaultQueueConfig,
|
||||||
|
})
|
||||||
t1 := h.NewTaskMessage("gen_thumbnail", nil)
|
t1 := h.NewTaskMessage("gen_thumbnail", nil)
|
||||||
t2 := h.NewTaskMessage("send_email", nil)
|
t2 := h.NewTaskMessage("send_email", nil)
|
||||||
t3 := h.NewTaskMessage("reindex", nil)
|
t3 := h.NewTaskMessage("reindex", nil)
|
||||||
|
29
server.go
29
server.go
@ -293,11 +293,30 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority)
|
ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority)
|
||||||
syncCh := make(chan *syncRequest)
|
syncCh := make(chan *syncRequest)
|
||||||
cancels := base.NewCancelations()
|
cancels := base.NewCancelations()
|
||||||
syncer := newSyncer(logger, syncCh, 5*time.Second)
|
|
||||||
heartbeater := newHeartbeater(logger, rdb, ss, 5*time.Second)
|
syncer := newSyncer(syncerParams{
|
||||||
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
|
logger: logger,
|
||||||
subscriber := newSubscriber(logger, rdb, cancels)
|
requestsCh: syncCh,
|
||||||
processor := newProcessor(newProcessorParams{
|
interval: 5 * time.Second,
|
||||||
|
})
|
||||||
|
heartbeater := newHeartbeater(heartbeaterParams{
|
||||||
|
logger: logger,
|
||||||
|
broker: rdb,
|
||||||
|
serverState: ss,
|
||||||
|
interval: 5 * time.Second,
|
||||||
|
})
|
||||||
|
scheduler := newScheduler(schedulerParams{
|
||||||
|
logger: logger,
|
||||||
|
broker: rdb,
|
||||||
|
interval: 5 * time.Second,
|
||||||
|
queues: queues,
|
||||||
|
})
|
||||||
|
subscriber := newSubscriber(subscriberParams{
|
||||||
|
logger: logger,
|
||||||
|
broker: rdb,
|
||||||
|
cancelations: cancels,
|
||||||
|
})
|
||||||
|
processor := newProcessor(processorParams{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
broker: rdb,
|
broker: rdb,
|
||||||
ss: ss,
|
ss: ss,
|
||||||
|
@ -27,12 +27,18 @@ type subscriber struct {
|
|||||||
retryTimeout time.Duration
|
retryTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations) *subscriber {
|
type subscriberParams struct {
|
||||||
|
logger *log.Logger
|
||||||
|
broker base.Broker
|
||||||
|
cancelations *base.Cancelations
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSubscriber(params subscriberParams) *subscriber {
|
||||||
return &subscriber{
|
return &subscriber{
|
||||||
logger: l,
|
logger: params.logger,
|
||||||
broker: b,
|
broker: params.broker,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
cancelations: cancelations,
|
cancelations: params.cancelations,
|
||||||
retryTimeout: 5 * time.Second,
|
retryTimeout: 5 * time.Second,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,11 @@ func TestSubscriber(t *testing.T) {
|
|||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
cancelations.Add(tc.registeredID, fakeCancelFunc)
|
cancelations.Add(tc.registeredID, fakeCancelFunc)
|
||||||
|
|
||||||
subscriber := newSubscriber(testLogger, rdbClient, cancelations)
|
subscriber := newSubscriber(subscriberParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: rdbClient,
|
||||||
|
cancelations: cancelations,
|
||||||
|
})
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
subscriber.start(&wg)
|
subscriber.start(&wg)
|
||||||
defer subscriber.terminate()
|
defer subscriber.terminate()
|
||||||
@ -75,7 +79,11 @@ func TestSubscriberWithRedisDown(t *testing.T) {
|
|||||||
testBroker := testbroker.NewTestBroker(r)
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
|
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
subscriber := newSubscriber(testLogger, testBroker, cancelations)
|
subscriber := newSubscriber(subscriberParams{
|
||||||
|
logger: testLogger,
|
||||||
|
broker: testBroker,
|
||||||
|
cancelations: cancelations,
|
||||||
|
})
|
||||||
subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose.
|
subscriber.retryTimeout = 1 * time.Second // set shorter retry timeout for testing purpose.
|
||||||
|
|
||||||
testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis.
|
testBroker.Sleep() // simulate a situation where subscriber cannot connect to redis.
|
||||||
|
14
syncer.go
14
syncer.go
@ -30,12 +30,18 @@ type syncRequest struct {
|
|||||||
errMsg string // error message
|
errMsg string // error message
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
type syncerParams struct {
|
||||||
|
logger *log.Logger
|
||||||
|
requestsCh <-chan *syncRequest
|
||||||
|
interval time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSyncer(params syncerParams) *syncer {
|
||||||
return &syncer{
|
return &syncer{
|
||||||
logger: l,
|
logger: params.logger,
|
||||||
requestsCh: requestsCh,
|
requestsCh: params.requestsCh,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
interval: interval,
|
interval: params.interval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,7 +27,11 @@ func TestSyncer(t *testing.T) {
|
|||||||
|
|
||||||
const interval = time.Second
|
const interval = time.Second
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
syncer := newSyncer(testLogger, syncRequestCh, interval)
|
syncer := newSyncer(syncerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
requestsCh: syncRequestCh,
|
||||||
|
interval: interval,
|
||||||
|
})
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
syncer.start(&wg)
|
syncer.start(&wg)
|
||||||
defer syncer.terminate()
|
defer syncer.terminate()
|
||||||
@ -52,7 +56,11 @@ func TestSyncer(t *testing.T) {
|
|||||||
func TestSyncerRetry(t *testing.T) {
|
func TestSyncerRetry(t *testing.T) {
|
||||||
const interval = time.Second
|
const interval = time.Second
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
syncer := newSyncer(testLogger, syncRequestCh, interval)
|
syncer := newSyncer(syncerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
requestsCh: syncRequestCh,
|
||||||
|
interval: interval,
|
||||||
|
})
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
syncer.start(&wg)
|
syncer.start(&wg)
|
||||||
|
Loading…
Reference in New Issue
Block a user