From 0bc6eba021eeaee9134cd475741c89815f74f34b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 12 Mar 2020 07:31:10 -0700 Subject: [PATCH] Allow custom logger to be used in Background --- background.go | 39 ++++++++++++++++++++++++++++++++++++--- heartbeat.go | 5 ++--- internal/log/log.go | 12 ++++++++++++ internal/log/log_test.go | 32 ++++++++++++++++++++++++++++++++ processor.go | 5 ++--- scheduler.go | 5 ++--- subscriber.go | 5 ++--- syncer.go | 6 ++---- 8 files changed, 90 insertions(+), 19 deletions(-) diff --git a/background.go b/background.go index 0ec07f0..2f01648 100644 --- a/background.go +++ b/background.go @@ -40,7 +40,7 @@ type Background struct { // wait group to wait for all goroutines to finish. wg sync.WaitGroup - logger *log.Logger + logger Logger rdb *rdb.RDB scheduler *scheduler @@ -107,6 +107,11 @@ type Config struct { // // ErrorHandler: asynq.ErrorHandlerFunc(reportError) ErrorHandler ErrorHandler + + // Logger specifies the logger used by the background instance. + // + // If unset, default logger is used. + Logger Logger } // An ErrorHandler handles errors returned by the task handler. @@ -123,6 +128,25 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry fn(task, err, retried, maxRetry) } +// Logger implements logging with various log levels. +type Logger interface { + // Debug logs a message at Debug level. + Debug(format string, args ...interface{}) + + // Info logs a message at Info level. + Info(format string, args ...interface{}) + + // Warn logs a message at Warning level. + Warn(format string, args ...interface{}) + + // Error logs a message at Error level. + Error(format string, args ...interface{}) + + // Fatal logs a message at Fatal level + // and process will exit with status set to 1. + Fatal(format string, args ...interface{}) +} + // Formula taken from https://github.com/mperham/sidekiq. func defaultDelayFunc(n int, e error, t *Task) time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -154,6 +178,10 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { if len(queues) == 0 { queues = defaultQueueConfig } + logger := cfg.Logger + if logger == nil { + logger = log.NewLogger(os.Stderr) + } host, err := os.Hostname() if err != nil { @@ -161,7 +189,6 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { } pid := os.Getpid() - logger := log.NewLogger(os.Stderr) rdb := rdb.NewRDB(createRedisClient(r)) ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) @@ -210,7 +237,13 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. func (bg *Background) Run(handler Handler) { - bg.logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) + type prefixLogger interface { + SetPrefix(prefix string) + } + // If logger supports setting prefix, then set prefix for log output. + if l, ok := bg.logger.(prefixLogger); ok { + l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) + } bg.logger.Info("Starting processing") bg.start(handler) diff --git a/heartbeat.go b/heartbeat.go index 97f7dd9..dff5175 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -9,14 +9,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) // heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { - logger *log.Logger + logger Logger rdb *rdb.RDB ps *base.ProcessState @@ -28,7 +27,7 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l *log.Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { +func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, rdb: rdb, diff --git a/internal/log/log.go b/internal/log/log.go index 619b9b4..56c1db9 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -8,6 +8,7 @@ package log import ( "io" stdlog "log" + "os" ) func NewLogger(out io.Writer) *Logger { @@ -20,6 +21,11 @@ type Logger struct { *stdlog.Logger } +func (l *Logger) Debug(format string, args ...interface{}) { + format = "DEBUG: " + format + l.Printf(format, args...) +} + func (l *Logger) Info(format string, args ...interface{}) { format = "INFO: " + format l.Printf(format, args...) @@ -34,3 +40,9 @@ func (l *Logger) Error(format string, args ...interface{}) { format = "ERROR: " + format l.Printf(format, args...) } + +func (l *Logger) Fatal(format string, args ...interface{}) { + format = "FATAL: " + format + l.Printf(format, args...) + os.Exit(1) +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 3442147..3c96418 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -24,6 +24,38 @@ type tester struct { wantPattern string // regexp that log output must match } +func TestLoggerDebug(t *testing.T) { + tests := []tester{ + { + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + { + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(&buf) + + logger.Debug(tc.message) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + tc.message, got, tc.wantPattern) + } + } +} + func TestLoggerInfo(t *testing.T) { tests := []tester{ { diff --git a/processor.go b/processor.go index 12cb25a..6b3a1ed 100644 --- a/processor.go +++ b/processor.go @@ -13,13 +13,12 @@ import ( "time" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" "golang.org/x/time/rate" ) type processor struct { - logger *log.Logger + logger Logger rdb *rdb.RDB ps *base.ProcessState @@ -63,7 +62,7 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -func newProcessor(l *log.Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, +func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { info := ps.Get() qcfg := normalizeQueueCfg(info.Queues) diff --git a/scheduler.go b/scheduler.go index 67bb6e7..06a314e 100644 --- a/scheduler.go +++ b/scheduler.go @@ -8,12 +8,11 @@ import ( "sync" "time" - "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) type scheduler struct { - logger *log.Logger + logger Logger rdb *rdb.RDB // channel to communicate back to the long running "scheduler" goroutine. @@ -26,7 +25,7 @@ type scheduler struct { qnames []string } -func newScheduler(l *log.Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) diff --git a/subscriber.go b/subscriber.go index fd420ac..d401013 100644 --- a/subscriber.go +++ b/subscriber.go @@ -8,12 +8,11 @@ import ( "sync" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) type subscriber struct { - logger *log.Logger + logger Logger rdb *rdb.RDB // channel to communicate back to the long running "subscriber" goroutine. @@ -23,7 +22,7 @@ type subscriber struct { cancelations *base.Cancelations } -func newSubscriber(l *log.Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { return &subscriber{ logger: l, rdb: rdb, diff --git a/syncer.go b/syncer.go index 018196d..1ddef9b 100644 --- a/syncer.go +++ b/syncer.go @@ -7,14 +7,12 @@ package asynq import ( "sync" "time" - - "github.com/hibiken/asynq/internal/log" ) // syncer is responsible for queuing up failed requests to redis and retry // those requests to sync state between the background process and redis. type syncer struct { - logger *log.Logger + logger Logger requestsCh <-chan *syncRequest @@ -30,7 +28,7 @@ type syncRequest struct { errMsg string // error message } -func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { +func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { return &syncer{ logger: l, requestsCh: requestsCh,