From b63476ddc867c1adac7c42f66052791f178d3a61 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 5 May 2020 22:10:11 -0700 Subject: [PATCH] Simplify Logger interface --- asynq_test.go | 3 +- heartbeat.go | 7 +- internal/log/log.go | 98 +++++++++++++++----- internal/log/log_test.go | 196 ++++++++++++++++++++++++++++++++------- processor.go | 23 ++--- scheduler.go | 7 +- server.go | 26 ++---- subscriber.go | 7 +- syncer.go | 6 +- 9 files changed, 277 insertions(+), 96 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 3f6675c..3aff019 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -5,7 +5,6 @@ package asynq import ( - "os" "sort" "testing" @@ -24,7 +23,7 @@ const ( redisDB = 14 ) -var testLogger = log.NewLogger(os.Stderr) +var testLogger = log.NewLogger(nil) func setup(tb testing.TB) *redis.Client { tb.Helper() diff --git a/heartbeat.go b/heartbeat.go index b89845c..61f7908 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -9,12 +9,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) // heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState @@ -26,7 +27,7 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { +func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, broker: b, @@ -67,6 +68,6 @@ func (h *heartbeater) beat() { // and short enough to expire quickly once the process is shut down or killed. err := h.broker.WriteServerState(h.ss, h.interval*2) if err != nil { - h.logger.Error("could not write heartbeat data: %v", err) + h.logger.Errorf("could not write heartbeat data: %v", err) } } diff --git a/internal/log/log.go b/internal/log/log.go index ba3b368..7fdd5bf 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -6,52 +6,106 @@ package log import ( + "fmt" "io" stdlog "log" "os" ) -// NewLogger creates and returns a new instance of Logger. -func NewLogger(out io.Writer) *Logger { - return &Logger{ - stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC), - } +// Base supports logging with various log levels. +type Base interface { + // Debug logs a message at Debug level. + Debug(args ...interface{}) + + // Info logs a message at Info level. + Info(args ...interface{}) + + // Warn logs a message at Warning level. + Warn(args ...interface{}) + + // Error logs a message at Error level. + Error(args ...interface{}) + + // Fatal logs a message at Fatal level + // and process will exit with status set to 1. + Fatal(args ...interface{}) } -// Logger is a wrapper object around log.Logger from the standard library. +// baseLogger is a wrapper object around log.Logger from the standard library. // It supports logging at various log levels. -type Logger struct { +type baseLogger struct { *stdlog.Logger } // Debug logs a message at Debug level. -func (l *Logger) Debug(format string, args ...interface{}) { - format = "DEBUG: " + format - l.Printf(format, args...) +func (l *baseLogger) Debug(args ...interface{}) { + l.prefixPrint("DEBUG: ", args...) } // Info logs a message at Info level. -func (l *Logger) Info(format string, args ...interface{}) { - format = "INFO: " + format - l.Printf(format, args...) +func (l *baseLogger) Info(args ...interface{}) { + l.prefixPrint("INFO: ", args...) } // Warn logs a message at Warning level. -func (l *Logger) Warn(format string, args ...interface{}) { - format = "WARN: " + format - l.Printf(format, args...) +func (l *baseLogger) Warn(args ...interface{}) { + l.prefixPrint("WARN: ", args...) } // Error logs a message at Error level. -func (l *Logger) Error(format string, args ...interface{}) { - format = "ERROR: " + format - l.Printf(format, args...) +func (l *baseLogger) Error(args ...interface{}) { + l.prefixPrint("ERROR: ", args...) } // Fatal logs a message at Fatal level // and process will exit with status set to 1. -func (l *Logger) Fatal(format string, args ...interface{}) { - format = "FATAL: " + format - l.Printf(format, args...) +func (l *baseLogger) Fatal(args ...interface{}) { + l.prefixPrint("FATAL: ", args...) os.Exit(1) } + +func (l *baseLogger) prefixPrint(prefix string, args ...interface{}) { + args = append([]interface{}{prefix}, args...) + l.Print(args...) +} + +// newBase creates and returns a new instance of baseLogger. +func newBase(out io.Writer) *baseLogger { + prefix := fmt.Sprintf("asynq: pid=%d ", os.Getpid()) + return &baseLogger{ + stdlog.New(out, prefix, stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC), + } +} + +// NewLogger creates and returns a new instance of Logger. +func NewLogger(base Base) *Logger { + if base == nil { + base = newBase(os.Stderr) + } + return &Logger{base} +} + +// Logger logs message to io.Writer with various log levels. +type Logger struct { + Base +} + +func (l *Logger) Debugf(format string, args ...interface{}) { + l.Debug(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Infof(format string, args ...interface{}) { + l.Info(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Warnf(format string, args ...interface{}) { + l.Warn(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Errorf(format string, args ...interface{}) { + l.Error(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Fatalf(format string, args ...interface{}) { + l.Fatal(fmt.Sprintf(format, args...)) +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 3c96418..2285ae3 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -13,6 +13,7 @@ import ( // regexp for timestamps const ( + rgxPID = `[0-9]+` rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]` rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]` rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]` @@ -27,20 +28,22 @@ type tester struct { func TestLoggerDebug(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Debug(tc.message) @@ -50,7 +53,7 @@ func TestLoggerDebug(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Debug(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -59,20 +62,22 @@ func TestLoggerDebug(t *testing.T) { func TestLoggerInfo(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Info(tc.message) @@ -82,7 +87,7 @@ func TestLoggerInfo(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Info(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -91,20 +96,22 @@ func TestLoggerInfo(t *testing.T) { func TestLoggerWarn(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Warn(tc.message) @@ -114,7 +121,7 @@ func TestLoggerWarn(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Warn(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -123,20 +130,22 @@ func TestLoggerWarn(t *testing.T) { func TestLoggerError(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Error(tc.message) @@ -146,8 +155,131 @@ func TestLoggerError(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Error(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } } + +type formatTester struct { + desc string + format string + args []interface{} + wantPattern string // regexp that log output must match +} + +func TestLoggerDebugf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with DEBUG prefix", + format: "hello, %s!", + args: []interface{}{"Gopher"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, Gopher!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Debugf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Debugf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerInfof(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with INFO prefix", + format: "%d,%d,%d", + args: []interface{}{1, 2, 3}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: 1,2,3\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Infof(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Infof(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerWarnf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with WARN prefix", + format: "hello, %s", + args: []interface{}{"Gophers"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, Gophers\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Warnf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Warnf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerErrorf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with ERROR prefix", + format: "hello, %s", + args: []interface{}{"Gophers"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, Gophers\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Errorf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Errorf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} diff --git a/processor.go b/processor.go index f0bd799..092fd37 100644 --- a/processor.go +++ b/processor.go @@ -13,12 +13,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" "golang.org/x/time/rate" ) type processor struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState @@ -64,7 +65,7 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration type newProcessorParams struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState retryDelayFunc retryDelayFunc @@ -170,7 +171,7 @@ func (p *processor) exec() { } if err != nil { if p.errLogLimiter.Allow() { - p.logger.Error("Dequeue error: %v", err) + p.logger.Errorf("Dequeue error: %v", err) } return } @@ -202,7 +203,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. - p.logger.Warn("Quitting worker. task id=%s", msg.ID) + p.logger.Warnf("Quitting worker. task id=%s", msg.ID) return case resErr := <-resCh: // Note: One of three things should happen. @@ -231,17 +232,17 @@ func (p *processor) exec() { func (p *processor) restore() { n, err := p.broker.RequeueAll() if err != nil { - p.logger.Error("Could not restore unfinished tasks: %v", err) + p.logger.Errorf("Could not restore unfinished tasks: %v", err) } if n > 0 { - p.logger.Info("Restored %d unfinished tasks back to queue", n) + p.logger.Infof("Restored %d unfinished tasks back to queue", n) } } func (p *processor) requeue(msg *base.TaskMessage) { err := p.broker.Requeue(msg) if err != nil { - p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err) + p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err) } } @@ -249,7 +250,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Done(msg) @@ -265,7 +266,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Retry(msg, retryAt, e.Error()) @@ -276,11 +277,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { } func (p *processor) kill(msg *base.TaskMessage, e error) { - p.logger.Warn("Retry exhausted for task id=%s", msg.ID) + p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) err := p.broker.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Kill(msg, e.Error()) diff --git a/scheduler.go b/scheduler.go index 8f4f860..aa0a36d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -9,10 +9,11 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) type scheduler struct { - logger Logger + logger *log.Logger broker base.Broker // channel to communicate back to the long running "scheduler" goroutine. @@ -25,7 +26,7 @@ type scheduler struct { qnames []string } -func newScheduler(l Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) @@ -64,6 +65,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) { func (s *scheduler) exec() { if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil { - s.logger.Error("Could not enqueue scheduled tasks: %v", err) + s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } diff --git a/server.go b/server.go index 50fc27a..86ff04f 100644 --- a/server.go +++ b/server.go @@ -35,7 +35,7 @@ import ( type Server struct { ss *base.ServerState - logger Logger + logger *log.Logger broker base.Broker @@ -133,23 +133,23 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry fn(task, err, retried, maxRetry) } -// Logger implements logging with various log levels. +// Logger supports logging with various log levels. type Logger interface { // Debug logs a message at Debug level. - Debug(format string, args ...interface{}) + Debug(args ...interface{}) // Info logs a message at Info level. - Info(format string, args ...interface{}) + Info(args ...interface{}) // Warn logs a message at Warning level. - Warn(format string, args ...interface{}) + Warn(args ...interface{}) // Error logs a message at Error level. - Error(format string, args ...interface{}) + Error(args ...interface{}) // Fatal logs a message at Fatal level // and process will exit with status set to 1. - Fatal(format string, args ...interface{}) + Fatal(args ...interface{}) } // Formula taken from https://github.com/mperham/sidekiq. @@ -185,10 +185,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if len(queues) == 0 { queues = defaultQueueConfig } - logger := cfg.Logger - if logger == nil { - logger = log.NewLogger(os.Stderr) - } shutdownTimeout := cfg.ShutdownTimeout if shutdownTimeout == 0 { shutdownTimeout = defaultShutdownTimeout @@ -201,6 +197,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { pid := os.Getpid() rdb := rdb.NewRDB(createRedisClient(r)) + logger := log.NewLogger(cfg.Logger) ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() @@ -291,13 +288,6 @@ func (srv *Server) Start(handler Handler) error { srv.ss.SetStatus(base.StatusRunning) srv.processor.handler = handler - type prefixLogger interface { - SetPrefix(prefix string) - } - // If logger supports setting prefix, then set prefix for log output. - if l, ok := srv.logger.(prefixLogger); ok { - l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) - } srv.logger.Info("Starting processing") srv.heartbeater.start(&srv.wg) diff --git a/subscriber.go b/subscriber.go index c72ac89..be7a010 100644 --- a/subscriber.go +++ b/subscriber.go @@ -10,10 +10,11 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) type subscriber struct { - logger Logger + logger *log.Logger broker base.Broker // channel to communicate back to the long running "subscriber" goroutine. @@ -26,7 +27,7 @@ type subscriber struct { retryTimeout time.Duration } -func newSubscriber(l Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { return &subscriber{ logger: l, broker: b, @@ -54,7 +55,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { for { pubsub, err = s.broker.CancelationPubSub() if err != nil { - s.logger.Error("cannot subscribe to cancelation channel: %v", err) + s.logger.Errorf("cannot subscribe to cancelation channel: %v", err) select { case <-time.After(s.retryTimeout): continue diff --git a/syncer.go b/syncer.go index 1ddef9b..018196d 100644 --- a/syncer.go +++ b/syncer.go @@ -7,12 +7,14 @@ package asynq import ( "sync" "time" + + "github.com/hibiken/asynq/internal/log" ) // syncer is responsible for queuing up failed requests to redis and retry // those requests to sync state between the background process and redis. type syncer struct { - logger Logger + logger *log.Logger requestsCh <-chan *syncRequest @@ -28,7 +30,7 @@ type syncRequest struct { errMsg string // error message } -func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { +func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { return &syncer{ logger: l, requestsCh: requestsCh,