diff --git a/asynq_test.go b/asynq_test.go index 1b0e3eb..3f01066 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -5,12 +5,14 @@ package asynq import ( + "os" "sort" "testing" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" h "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq/internal/log" ) // This file defines test helper functions used by @@ -22,6 +24,8 @@ const ( redisDB = 14 ) +var testLogger = log.NewLogger(os.Stderr) + func setup(tb testing.TB) *redis.Client { tb.Helper() r := redis.NewClient(&redis.Options{ diff --git a/background.go b/background.go index 275fbaa..0ec07f0 100644 --- a/background.go +++ b/background.go @@ -16,6 +16,7 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) @@ -39,6 +40,8 @@ type Background struct { // wait group to wait for all goroutines to finish. wg sync.WaitGroup + logger *log.Logger + rdb *rdb.RDB scheduler *scheduler processor *processor @@ -158,16 +161,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { } pid := os.Getpid() + logger := log.NewLogger(os.Stderr) rdb := rdb.NewRDB(createRedisClient(r)) ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() - syncer := newSyncer(syncCh, 5*time.Second) - heartbeater := newHeartbeater(rdb, ps, 5*time.Second) - scheduler := newScheduler(rdb, 5*time.Second, queues) - processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) - subscriber := newSubscriber(rdb, cancels) + syncer := newSyncer(logger, syncCh, 5*time.Second) + heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second) + scheduler := newScheduler(logger, rdb, 5*time.Second, queues) + processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) + subscriber := newSubscriber(logger, rdb, cancels) return &Background{ + logger: logger, rdb: rdb, ps: ps, scheduler: scheduler, @@ -205,14 +210,14 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. func (bg *Background) Run(handler Handler) { - logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) - logger.info("Starting processing") + bg.logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) + bg.logger.Info("Starting processing") bg.start(handler) defer bg.stop() - logger.info("Send signal TSTP to stop processing new tasks") - logger.info("Send signal TERM or INT to terminate the process") + bg.logger.Info("Send signal TSTP to stop processing new tasks") + bg.logger.Info("Send signal TERM or INT to terminate the process") // Wait for a signal to terminate. sigs := make(chan os.Signal, 1) @@ -227,7 +232,7 @@ func (bg *Background) Run(handler Handler) { break } fmt.Println() - logger.info("Starting graceful shutdown") + bg.logger.Info("Starting graceful shutdown") } // starts the background-task processing. @@ -271,5 +276,5 @@ func (bg *Background) stop() { bg.rdb.Close() bg.running = false - logger.info("Bye!") + bg.logger.Info("Bye!") } diff --git a/heartbeat.go b/heartbeat.go index 556a219..97f7dd9 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -9,13 +9,15 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) // heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { - rdb *rdb.RDB + logger *log.Logger + rdb *rdb.RDB ps *base.ProcessState @@ -26,8 +28,9 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { +func newHeartbeater(l *log.Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { return &heartbeater{ + logger: l, rdb: rdb, ps: ps, done: make(chan struct{}), @@ -36,7 +39,7 @@ func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) } func (h *heartbeater) terminate() { - logger.info("Heartbeater shutting down...") + h.logger.Info("Heartbeater shutting down...") // Signal the heartbeater goroutine to stop. h.done <- struct{}{} } @@ -52,7 +55,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { select { case <-h.done: h.rdb.ClearProcessState(h.ps) - logger.info("Heartbeater done") + h.logger.Info("Heartbeater done") return case <-time.After(h.interval): h.beat() @@ -66,6 +69,6 @@ func (h *heartbeater) beat() { // and short enough to expire quickly once the process is shut down or killed. err := h.rdb.WriteProcessState(h.ps, h.interval*2) if err != nil { - logger.error("could not write heartbeat data: %v", err) + h.logger.Error("could not write heartbeat data: %v", err) } } diff --git a/heartbeat_test.go b/heartbeat_test.go index c7b1089..e6deb88 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -36,7 +36,7 @@ func TestHeartbeater(t *testing.T) { h.FlushDB(t, r) state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false) - hb := newHeartbeater(rdbClient, state, tc.interval) + hb := newHeartbeater(testLogger, rdbClient, state, tc.interval) var wg sync.WaitGroup hb.start(&wg) diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000..619b9b4 --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,36 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +// Package log exports logging related types and functions. +package log + +import ( + "io" + stdlog "log" +) + +func NewLogger(out io.Writer) *Logger { + return &Logger{ + stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC), + } +} + +type Logger struct { + *stdlog.Logger +} + +func (l *Logger) Info(format string, args ...interface{}) { + format = "INFO: " + format + l.Printf(format, args...) +} + +func (l *Logger) Warn(format string, args ...interface{}) { + format = "WARN: " + format + l.Printf(format, args...) +} + +func (l *Logger) Error(format string, args ...interface{}) { + format = "ERROR: " + format + l.Printf(format, args...) +} diff --git a/logger_test.go b/internal/log/log_test.go similarity index 89% rename from logger_test.go rename to internal/log/log_test.go index 0d9b926..3442147 100644 --- a/logger_test.go +++ b/internal/log/log_test.go @@ -1,4 +1,8 @@ -package asynq +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package log import ( "bytes" @@ -36,9 +40,9 @@ func TestLoggerInfo(t *testing.T) { for _, tc := range tests { var buf bytes.Buffer - logger := newLogger(&buf) + logger := NewLogger(&buf) - logger.info(tc.message) + logger.Info(tc.message) got := buf.String() matched, err := regexp.MatchString(tc.wantPattern, got) @@ -68,9 +72,9 @@ func TestLoggerWarn(t *testing.T) { for _, tc := range tests { var buf bytes.Buffer - logger := newLogger(&buf) + logger := NewLogger(&buf) - logger.warn(tc.message) + logger.Warn(tc.message) got := buf.String() matched, err := regexp.MatchString(tc.wantPattern, got) @@ -100,9 +104,9 @@ func TestLoggerError(t *testing.T) { for _, tc := range tests { var buf bytes.Buffer - logger := newLogger(&buf) + logger := NewLogger(&buf) - logger.error(tc.message) + logger.Error(tc.message) got := buf.String() matched, err := regexp.MatchString(tc.wantPattern, got) diff --git a/logger.go b/logger.go deleted file mode 100644 index 803e3a4..0000000 --- a/logger.go +++ /dev/null @@ -1,35 +0,0 @@ -package asynq - -import ( - "io" - "log" - "os" -) - -// global logger used in asynq package. -var logger = newLogger(os.Stderr) - -func newLogger(out io.Writer) *asynqLogger { - return &asynqLogger{ - log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC), - } -} - -type asynqLogger struct { - *log.Logger -} - -func (l *asynqLogger) info(format string, args ...interface{}) { - format = "INFO: " + format - l.Printf(format, args...) -} - -func (l *asynqLogger) warn(format string, args ...interface{}) { - format = "WARN: " + format - l.Printf(format, args...) -} - -func (l *asynqLogger) error(format string, args ...interface{}) { - format = "ERROR: " + format - l.Printf(format, args...) -} diff --git a/processor.go b/processor.go index d2c460b..12cb25a 100644 --- a/processor.go +++ b/processor.go @@ -13,12 +13,14 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" "golang.org/x/time/rate" ) type processor struct { - rdb *rdb.RDB + logger *log.Logger + rdb *rdb.RDB ps *base.ProcessState @@ -61,7 +63,7 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, +func newProcessor(l *log.Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { info := ps.Get() qcfg := normalizeQueueCfg(info.Queues) @@ -70,6 +72,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, orderedQueues = sortByPriority(qcfg) } return &processor{ + logger: l, rdb: r, ps: ps, queueConfig: qcfg, @@ -91,7 +94,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, // It's safe to call this method multiple times. func (p *processor) stop() { p.once.Do(func() { - logger.info("Processor shutting down...") + p.logger.Info("Processor shutting down...") // Unblock if processor is waiting for sema token. close(p.abort) // Signal the processor goroutine to stop processing tasks @@ -107,7 +110,7 @@ func (p *processor) terminate() { // IDEA: Allow user to customize this timeout value. const timeout = 8 * time.Second time.AfterFunc(timeout, func() { close(p.quit) }) - logger.info("Waiting for all workers to finish...") + p.logger.Info("Waiting for all workers to finish...") // send cancellation signal to all in-progress task handlers for _, cancel := range p.cancelations.GetAll() { @@ -118,7 +121,7 @@ func (p *processor) terminate() { for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} } - logger.info("All workers have finished") + p.logger.Info("All workers have finished") p.restore() // move any unfinished tasks back to the queue. } @@ -132,7 +135,7 @@ func (p *processor) start(wg *sync.WaitGroup) { for { select { case <-p.done: - logger.info("Processor done") + p.logger.Info("Processor done") return default: p.exec() @@ -158,7 +161,7 @@ func (p *processor) exec() { } if err != nil { if p.errLogLimiter.Allow() { - logger.error("Dequeue error: %v", err) + p.logger.Error("Dequeue error: %v", err) } return } @@ -188,7 +191,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. - logger.warn("Quitting worker. task id=%s", msg.ID) + p.logger.Warn("Quitting worker. task id=%s", msg.ID) return case resErr := <-resCh: // Note: One of three things should happen. @@ -217,17 +220,17 @@ func (p *processor) exec() { func (p *processor) restore() { n, err := p.rdb.RequeueAll() if err != nil { - logger.error("Could not restore unfinished tasks: %v", err) + p.logger.Error("Could not restore unfinished tasks: %v", err) } if n > 0 { - logger.info("Restored %d unfinished tasks back to queue", n) + p.logger.Info("Restored %d unfinished tasks back to queue", n) } } func (p *processor) requeue(msg *base.TaskMessage) { err := p.rdb.Requeue(msg) if err != nil { - logger.error("Could not push task id=%s back to queue: %v", msg.ID, err) + p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err) } } @@ -235,7 +238,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.rdb.Done(msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) - logger.warn("%s; Will retry syncing", errMsg) + p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Done(msg) @@ -251,7 +254,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { err := p.rdb.Retry(msg, retryAt, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) - logger.warn("%s; Will retry syncing", errMsg) + p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Retry(msg, retryAt, e.Error()) @@ -262,11 +265,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { } func (p *processor) kill(msg *base.TaskMessage, e error) { - logger.warn("Retry exhausted for task id=%s", msg.ID) + p.logger.Warn("Retry exhausted for task id=%s", msg.ID) err := p.rdb.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) - logger.warn("%s; Will retry syncing", errMsg) + p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Kill(msg, e.Error()) diff --git a/processor_test.go b/processor_test.go index d439341..3c79102 100644 --- a/processor_test.go +++ b/processor_test.go @@ -69,7 +69,7 @@ func TestProcessorSuccess(t *testing.T) { } ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -167,7 +167,7 @@ func TestProcessorRetry(t *testing.T) { } ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) + p := newProcessor(testLogger, rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) p.handler = tc.handler var wg sync.WaitGroup @@ -233,7 +233,7 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { cancelations := base.NewCancelations() ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false) - p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(testLogger, nil, ps, defaultDelayFunc, nil, cancelations, nil) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -301,7 +301,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { // Note: Set concurrency to 1 to make sure tasks are processed one at a time. cancelations := base.NewCancelations() ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) - p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup diff --git a/scheduler.go b/scheduler.go index c5f28d2..67bb6e7 100644 --- a/scheduler.go +++ b/scheduler.go @@ -8,11 +8,13 @@ import ( "sync" "time" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) type scheduler struct { - rdb *rdb.RDB + logger *log.Logger + rdb *rdb.RDB // channel to communicate back to the long running "scheduler" goroutine. done chan struct{} @@ -24,12 +26,13 @@ type scheduler struct { qnames []string } -func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l *log.Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) } return &scheduler{ + logger: l, rdb: r, done: make(chan struct{}), avgInterval: avgInterval, @@ -38,7 +41,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *s } func (s *scheduler) terminate() { - logger.info("Scheduler shutting down...") + s.logger.Info("Scheduler shutting down...") // Signal the scheduler goroutine to stop polling. s.done <- struct{}{} } @@ -51,7 +54,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) { for { select { case <-s.done: - logger.info("Scheduler done") + s.logger.Info("Scheduler done") return case <-time.After(s.avgInterval): s.exec() @@ -62,6 +65,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) { func (s *scheduler) exec() { if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { - logger.error("Could not enqueue scheduled tasks: %v", err) + s.logger.Error("Could not enqueue scheduled tasks: %v", err) } } diff --git a/scheduler_test.go b/scheduler_test.go index 4f7575f..e63e13b 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -19,7 +19,7 @@ func TestScheduler(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - s := newScheduler(rdbClient, pollInterval, defaultQueueConfig) + s := newScheduler(testLogger, rdbClient, pollInterval, defaultQueueConfig) t1 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("send_email", nil) t3 := h.NewTaskMessage("reindex", nil) diff --git a/subscriber.go b/subscriber.go index e8d3731..fd420ac 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,11 +8,13 @@ import ( "sync" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) type subscriber struct { - rdb *rdb.RDB + logger *log.Logger + rdb *rdb.RDB // channel to communicate back to the long running "subscriber" goroutine. done chan struct{} @@ -21,8 +23,9 @@ type subscriber struct { cancelations *base.Cancelations } -func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l *log.Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { return &subscriber{ + logger: l, rdb: rdb, done: make(chan struct{}), cancelations: cancelations, @@ -30,7 +33,7 @@ func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { } func (s *subscriber) terminate() { - logger.info("Subscriber shutting down...") + s.logger.Info("Subscriber shutting down...") // Signal the subscriber goroutine to stop. s.done <- struct{}{} } @@ -39,7 +42,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { pubsub, err := s.rdb.CancelationPubSub() cancelCh := pubsub.Channel() if err != nil { - logger.error("cannot subscribe to cancelation channel: %v", err) + s.logger.Error("cannot subscribe to cancelation channel: %v", err) return } wg.Add(1) @@ -49,7 +52,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { select { case <-s.done: pubsub.Close() - logger.info("Subscriber done") + s.logger.Info("Subscriber done") return case msg := <-cancelCh: cancel, ok := s.cancelations.Get(msg.Payload) diff --git a/subscriber_test.go b/subscriber_test.go index 0bc9258..ba8cd5e 100644 --- a/subscriber_test.go +++ b/subscriber_test.go @@ -37,7 +37,7 @@ func TestSubscriber(t *testing.T) { cancelations := base.NewCancelations() cancelations.Add(tc.registeredID, fakeCancelFunc) - subscriber := newSubscriber(rdbClient, cancelations) + subscriber := newSubscriber(testLogger, rdbClient, cancelations) var wg sync.WaitGroup subscriber.start(&wg) diff --git a/syncer.go b/syncer.go index 7494cbf..018196d 100644 --- a/syncer.go +++ b/syncer.go @@ -7,11 +7,15 @@ package asynq import ( "sync" "time" + + "github.com/hibiken/asynq/internal/log" ) // syncer is responsible for queuing up failed requests to redis and retry // those requests to sync state between the background process and redis. type syncer struct { + logger *log.Logger + requestsCh <-chan *syncRequest // channel to communicate back to the long running "syncer" goroutine. @@ -26,8 +30,9 @@ type syncRequest struct { errMsg string // error message } -func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { +func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { return &syncer{ + logger: l, requestsCh: requestsCh, done: make(chan struct{}), interval: interval, @@ -35,7 +40,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { } func (s *syncer) terminate() { - logger.info("Syncer shutting down...") + s.logger.Info("Syncer shutting down...") // Signal the syncer goroutine to stop. s.done <- struct{}{} } @@ -51,10 +56,10 @@ func (s *syncer) start(wg *sync.WaitGroup) { // Try sync one last time before shutting down. for _, req := range requests { if err := req.fn(); err != nil { - logger.error(req.errMsg) + s.logger.Error(req.errMsg) } } - logger.info("Syncer done") + s.logger.Info("Syncer done") return case req := <-s.requestsCh: requests = append(requests, req) diff --git a/syncer_test.go b/syncer_test.go index 37ce92b..e069074 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -27,7 +27,7 @@ func TestSyncer(t *testing.T) { const interval = time.Second syncRequestCh := make(chan *syncRequest) - syncer := newSyncer(syncRequestCh, interval) + syncer := newSyncer(testLogger, syncRequestCh, interval) var wg sync.WaitGroup syncer.start(&wg) defer syncer.terminate() @@ -52,7 +52,7 @@ func TestSyncer(t *testing.T) { func TestSyncerRetry(t *testing.T) { const interval = time.Second syncRequestCh := make(chan *syncRequest) - syncer := newSyncer(syncRequestCh, interval) + syncer := newSyncer(testLogger, syncRequestCh, interval) var wg sync.WaitGroup syncer.start(&wg)