From a866369866878f0c7794fabec3b28293ffeaafbb Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 5 May 2020 22:10:11 -0700 Subject: [PATCH 1/9] Simplify Logger interface --- asynq_test.go | 3 +- heartbeat.go | 7 +- internal/log/log.go | 98 +++++++++++++++----- internal/log/log_test.go | 196 ++++++++++++++++++++++++++++++++------- processor.go | 23 ++--- scheduler.go | 7 +- server.go | 26 ++---- subscriber.go | 7 +- syncer.go | 6 +- 9 files changed, 277 insertions(+), 96 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 3f6675c..3aff019 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -5,7 +5,6 @@ package asynq import ( - "os" "sort" "testing" @@ -24,7 +23,7 @@ const ( redisDB = 14 ) -var testLogger = log.NewLogger(os.Stderr) +var testLogger = log.NewLogger(nil) func setup(tb testing.TB) *redis.Client { tb.Helper() diff --git a/heartbeat.go b/heartbeat.go index b89845c..61f7908 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -9,12 +9,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) // heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState @@ -26,7 +27,7 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { +func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, broker: b, @@ -67,6 +68,6 @@ func (h *heartbeater) beat() { // and short enough to expire quickly once the process is shut down or killed. err := h.broker.WriteServerState(h.ss, h.interval*2) if err != nil { - h.logger.Error("could not write heartbeat data: %v", err) + h.logger.Errorf("could not write heartbeat data: %v", err) } } diff --git a/internal/log/log.go b/internal/log/log.go index ba3b368..7fdd5bf 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -6,52 +6,106 @@ package log import ( + "fmt" "io" stdlog "log" "os" ) -// NewLogger creates and returns a new instance of Logger. -func NewLogger(out io.Writer) *Logger { - return &Logger{ - stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC), - } +// Base supports logging with various log levels. +type Base interface { + // Debug logs a message at Debug level. + Debug(args ...interface{}) + + // Info logs a message at Info level. + Info(args ...interface{}) + + // Warn logs a message at Warning level. + Warn(args ...interface{}) + + // Error logs a message at Error level. + Error(args ...interface{}) + + // Fatal logs a message at Fatal level + // and process will exit with status set to 1. + Fatal(args ...interface{}) } -// Logger is a wrapper object around log.Logger from the standard library. +// baseLogger is a wrapper object around log.Logger from the standard library. // It supports logging at various log levels. -type Logger struct { +type baseLogger struct { *stdlog.Logger } // Debug logs a message at Debug level. -func (l *Logger) Debug(format string, args ...interface{}) { - format = "DEBUG: " + format - l.Printf(format, args...) +func (l *baseLogger) Debug(args ...interface{}) { + l.prefixPrint("DEBUG: ", args...) } // Info logs a message at Info level. -func (l *Logger) Info(format string, args ...interface{}) { - format = "INFO: " + format - l.Printf(format, args...) +func (l *baseLogger) Info(args ...interface{}) { + l.prefixPrint("INFO: ", args...) } // Warn logs a message at Warning level. -func (l *Logger) Warn(format string, args ...interface{}) { - format = "WARN: " + format - l.Printf(format, args...) +func (l *baseLogger) Warn(args ...interface{}) { + l.prefixPrint("WARN: ", args...) } // Error logs a message at Error level. -func (l *Logger) Error(format string, args ...interface{}) { - format = "ERROR: " + format - l.Printf(format, args...) +func (l *baseLogger) Error(args ...interface{}) { + l.prefixPrint("ERROR: ", args...) } // Fatal logs a message at Fatal level // and process will exit with status set to 1. -func (l *Logger) Fatal(format string, args ...interface{}) { - format = "FATAL: " + format - l.Printf(format, args...) +func (l *baseLogger) Fatal(args ...interface{}) { + l.prefixPrint("FATAL: ", args...) os.Exit(1) } + +func (l *baseLogger) prefixPrint(prefix string, args ...interface{}) { + args = append([]interface{}{prefix}, args...) + l.Print(args...) +} + +// newBase creates and returns a new instance of baseLogger. +func newBase(out io.Writer) *baseLogger { + prefix := fmt.Sprintf("asynq: pid=%d ", os.Getpid()) + return &baseLogger{ + stdlog.New(out, prefix, stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC), + } +} + +// NewLogger creates and returns a new instance of Logger. +func NewLogger(base Base) *Logger { + if base == nil { + base = newBase(os.Stderr) + } + return &Logger{base} +} + +// Logger logs message to io.Writer with various log levels. +type Logger struct { + Base +} + +func (l *Logger) Debugf(format string, args ...interface{}) { + l.Debug(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Infof(format string, args ...interface{}) { + l.Info(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Warnf(format string, args ...interface{}) { + l.Warn(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Errorf(format string, args ...interface{}) { + l.Error(fmt.Sprintf(format, args...)) +} + +func (l *Logger) Fatalf(format string, args ...interface{}) { + l.Fatal(fmt.Sprintf(format, args...)) +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 3c96418..2285ae3 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -13,6 +13,7 @@ import ( // regexp for timestamps const ( + rgxPID = `[0-9]+` rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]` rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]` rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]` @@ -27,20 +28,22 @@ type tester struct { func TestLoggerDebug(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Debug(tc.message) @@ -50,7 +53,7 @@ func TestLoggerDebug(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Debug(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -59,20 +62,22 @@ func TestLoggerDebug(t *testing.T) { func TestLoggerInfo(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Info(tc.message) @@ -82,7 +87,7 @@ func TestLoggerInfo(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Info(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -91,20 +96,22 @@ func TestLoggerInfo(t *testing.T) { func TestLoggerWarn(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Warn(tc.message) @@ -114,7 +121,7 @@ func TestLoggerWarn(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Warn(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } @@ -123,20 +130,22 @@ func TestLoggerWarn(t *testing.T) { func TestLoggerError(t *testing.T) { tests := []tester{ { - desc: "without trailing newline, logger adds newline", - message: "hello, world!", - wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, { - desc: "with trailing newline, logger preserves newline", - message: "hello, world!\n", - wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), }, } for _, tc := range tests { var buf bytes.Buffer - logger := NewLogger(&buf) + logger := NewLogger(newBase(&buf)) logger.Error(tc.message) @@ -146,8 +155,131 @@ func TestLoggerError(t *testing.T) { t.Fatal("pattern did not compile:", err) } if !matched { - t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + t.Errorf("logger.Error(%q) outputted %q, should match pattern %q", tc.message, got, tc.wantPattern) } } } + +type formatTester struct { + desc string + format string + args []interface{} + wantPattern string // regexp that log output must match +} + +func TestLoggerDebugf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with DEBUG prefix", + format: "hello, %s!", + args: []interface{}{"Gopher"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, Gopher!\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Debugf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Debugf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerInfof(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with INFO prefix", + format: "%d,%d,%d", + args: []interface{}{1, 2, 3}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: 1,2,3\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Infof(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Infof(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerWarnf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with WARN prefix", + format: "hello, %s", + args: []interface{}{"Gophers"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, Gophers\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Warnf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Warnf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} + +func TestLoggerErrorf(t *testing.T) { + tests := []formatTester{ + { + desc: "Formats message with ERROR prefix", + format: "hello, %s", + args: []interface{}{"Gophers"}, + wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, Gophers\n$", + rgxPID, rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + + logger.Errorf(tc.format, tc.args...) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.Errorf(%q, %v) outputted %q, should match pattern %q", + tc.format, tc.args, got, tc.wantPattern) + } + } +} diff --git a/processor.go b/processor.go index f0bd799..092fd37 100644 --- a/processor.go +++ b/processor.go @@ -13,12 +13,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" "golang.org/x/time/rate" ) type processor struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState @@ -64,7 +65,7 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration type newProcessorParams struct { - logger Logger + logger *log.Logger broker base.Broker ss *base.ServerState retryDelayFunc retryDelayFunc @@ -170,7 +171,7 @@ func (p *processor) exec() { } if err != nil { if p.errLogLimiter.Allow() { - p.logger.Error("Dequeue error: %v", err) + p.logger.Errorf("Dequeue error: %v", err) } return } @@ -202,7 +203,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. - p.logger.Warn("Quitting worker. task id=%s", msg.ID) + p.logger.Warnf("Quitting worker. task id=%s", msg.ID) return case resErr := <-resCh: // Note: One of three things should happen. @@ -231,17 +232,17 @@ func (p *processor) exec() { func (p *processor) restore() { n, err := p.broker.RequeueAll() if err != nil { - p.logger.Error("Could not restore unfinished tasks: %v", err) + p.logger.Errorf("Could not restore unfinished tasks: %v", err) } if n > 0 { - p.logger.Info("Restored %d unfinished tasks back to queue", n) + p.logger.Infof("Restored %d unfinished tasks back to queue", n) } } func (p *processor) requeue(msg *base.TaskMessage) { err := p.broker.Requeue(msg) if err != nil { - p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err) + p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err) } } @@ -249,7 +250,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Done(msg) @@ -265,7 +266,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Retry(msg, retryAt, e.Error()) @@ -276,11 +277,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { } func (p *processor) kill(msg *base.TaskMessage, e error) { - p.logger.Warn("Retry exhausted for task id=%s", msg.ID) + p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) err := p.broker.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) - p.logger.Warn("%s; Will retry syncing", errMsg) + p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.broker.Kill(msg, e.Error()) diff --git a/scheduler.go b/scheduler.go index 8f4f860..aa0a36d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -9,10 +9,11 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) type scheduler struct { - logger Logger + logger *log.Logger broker base.Broker // channel to communicate back to the long running "scheduler" goroutine. @@ -25,7 +26,7 @@ type scheduler struct { qnames []string } -func newScheduler(l Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) @@ -64,6 +65,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) { func (s *scheduler) exec() { if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil { - s.logger.Error("Could not enqueue scheduled tasks: %v", err) + s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } diff --git a/server.go b/server.go index 50fc27a..86ff04f 100644 --- a/server.go +++ b/server.go @@ -35,7 +35,7 @@ import ( type Server struct { ss *base.ServerState - logger Logger + logger *log.Logger broker base.Broker @@ -133,23 +133,23 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry fn(task, err, retried, maxRetry) } -// Logger implements logging with various log levels. +// Logger supports logging with various log levels. type Logger interface { // Debug logs a message at Debug level. - Debug(format string, args ...interface{}) + Debug(args ...interface{}) // Info logs a message at Info level. - Info(format string, args ...interface{}) + Info(args ...interface{}) // Warn logs a message at Warning level. - Warn(format string, args ...interface{}) + Warn(args ...interface{}) // Error logs a message at Error level. - Error(format string, args ...interface{}) + Error(args ...interface{}) // Fatal logs a message at Fatal level // and process will exit with status set to 1. - Fatal(format string, args ...interface{}) + Fatal(args ...interface{}) } // Formula taken from https://github.com/mperham/sidekiq. @@ -185,10 +185,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if len(queues) == 0 { queues = defaultQueueConfig } - logger := cfg.Logger - if logger == nil { - logger = log.NewLogger(os.Stderr) - } shutdownTimeout := cfg.ShutdownTimeout if shutdownTimeout == 0 { shutdownTimeout = defaultShutdownTimeout @@ -201,6 +197,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { pid := os.Getpid() rdb := rdb.NewRDB(createRedisClient(r)) + logger := log.NewLogger(cfg.Logger) ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() @@ -291,13 +288,6 @@ func (srv *Server) Start(handler Handler) error { srv.ss.SetStatus(base.StatusRunning) srv.processor.handler = handler - type prefixLogger interface { - SetPrefix(prefix string) - } - // If logger supports setting prefix, then set prefix for log output. - if l, ok := srv.logger.(prefixLogger); ok { - l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) - } srv.logger.Info("Starting processing") srv.heartbeater.start(&srv.wg) diff --git a/subscriber.go b/subscriber.go index c72ac89..be7a010 100644 --- a/subscriber.go +++ b/subscriber.go @@ -10,10 +10,11 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" ) type subscriber struct { - logger Logger + logger *log.Logger broker base.Broker // channel to communicate back to the long running "subscriber" goroutine. @@ -26,7 +27,7 @@ type subscriber struct { retryTimeout time.Duration } -func newSubscriber(l Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { return &subscriber{ logger: l, broker: b, @@ -54,7 +55,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { for { pubsub, err = s.broker.CancelationPubSub() if err != nil { - s.logger.Error("cannot subscribe to cancelation channel: %v", err) + s.logger.Errorf("cannot subscribe to cancelation channel: %v", err) select { case <-time.After(s.retryTimeout): continue diff --git a/syncer.go b/syncer.go index 1ddef9b..018196d 100644 --- a/syncer.go +++ b/syncer.go @@ -7,12 +7,14 @@ package asynq import ( "sync" "time" + + "github.com/hibiken/asynq/internal/log" ) // syncer is responsible for queuing up failed requests to redis and retry // those requests to sync state between the background process and redis. type syncer struct { - logger Logger + logger *log.Logger requestsCh <-chan *syncRequest @@ -28,7 +30,7 @@ type syncRequest struct { errMsg string // error message } -func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { +func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { return &syncer{ logger: l, requestsCh: requestsCh, From 00b82904c600dbdeb7d0ed0b1461b8b4c873b0b6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 7 May 2020 21:28:06 -0700 Subject: [PATCH 2/9] Allow setting minimum log level for logger --- internal/log/log.go | 105 ++++++++++++++++++++++++++++++++++++++- internal/log/log_test.go | 51 +++++++++++++++++++ server.go | 32 +++++++++++- 3 files changed, 185 insertions(+), 3 deletions(-) diff --git a/internal/log/log.go b/internal/log/log.go index 7fdd5bf..d191e6b 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -10,6 +10,7 @@ import ( "io" stdlog "log" "os" + "sync" ) // Base supports logging with various log levels. @@ -78,16 +79,105 @@ func newBase(out io.Writer) *baseLogger { } // NewLogger creates and returns a new instance of Logger. +// Log level is set to DebugLevel by default. func NewLogger(base Base) *Logger { if base == nil { base = newBase(os.Stderr) } - return &Logger{base} + return &Logger{base: base, level: DebugLevel} } // Logger logs message to io.Writer with various log levels. type Logger struct { - Base + base Base + + mu sync.Mutex + // Minimum log level for this logger. + // Message with level lower than this level won't be outputted. + level Level +} + +// Level represents a log level. +type Level int32 + +const ( + // DebugLevel is the lowest level of logging. + // Debug logs are intended for debugging and development purposes. + DebugLevel Level = iota + + // InfoLevel is used for general informational log messages. + InfoLevel + + // WarnLevel is used for undesired but relatively expected events, + // which may indicate a problem. + WarnLevel + + // ErrorLevel is used for undesired and unexpected events that + // the program can recover from. + ErrorLevel + + // FatalLevel is used for undesired and unexpected events that + // the program cannot recover from. + FatalLevel +) + +func (l Level) String() string { + switch l { + case DebugLevel: + return "debug" + case InfoLevel: + return "info" + case WarnLevel: + return "warning" + case ErrorLevel: + return "error" + case FatalLevel: + return "fatal" + default: + return "unknown" + } +} + +// canLogAt reports whether logger can log at level v. +func (l *Logger) canLogAt(v Level) bool { + l.mu.Lock() + defer l.mu.Unlock() + return v >= l.level +} + +func (l *Logger) Debug(args ...interface{}) { + if !l.canLogAt(DebugLevel) { + return + } + l.base.Debug(args...) +} + +func (l *Logger) Info(args ...interface{}) { + if !l.canLogAt(InfoLevel) { + return + } + l.base.Info(args...) +} + +func (l *Logger) Warn(args ...interface{}) { + if !l.canLogAt(WarnLevel) { + return + } + l.base.Warn(args...) +} + +func (l *Logger) Error(args ...interface{}) { + if !l.canLogAt(WarnLevel) { + return + } + l.base.Error(args...) +} + +func (l *Logger) Fatal(args ...interface{}) { + if !l.canLogAt(WarnLevel) { + return + } + l.base.Fatal(args...) } func (l *Logger) Debugf(format string, args ...interface{}) { @@ -109,3 +199,14 @@ func (l *Logger) Errorf(format string, args ...interface{}) { func (l *Logger) Fatalf(format string, args ...interface{}) { l.Fatal(fmt.Sprintf(format, args...)) } + +// SetLevel sets the logger level. +// It panics if v is less than DebugLevel or greater than FatalLevel. +func (l *Logger) SetLevel(v Level) { + l.mu.Lock() + defer l.mu.Unlock() + if v < DebugLevel || v > FatalLevel { + panic("log: invalid log level") + } + l.level = v +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 2285ae3..925cfce 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -283,3 +283,54 @@ func TestLoggerErrorf(t *testing.T) { } } } + +func TestLoggerWithMinLevels(t *testing.T) { + tests := []struct { + level Level + op string + }{ + // with level one above + {InfoLevel, "Debug"}, + {InfoLevel, "Debugf"}, + {WarnLevel, "Info"}, + {WarnLevel, "Infof"}, + {ErrorLevel, "Warn"}, + {ErrorLevel, "Warnf"}, + {FatalLevel, "Error"}, + {FatalLevel, "Errorf"}, + // with skip level + {WarnLevel, "Debug"}, + {ErrorLevel, "Infof"}, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + logger.SetLevel(tc.level) + + switch tc.op { + case "Debug": + logger.Debug("hello") + case "Debugf": + logger.Debugf("hello, %s", "world") + case "Info": + logger.Info("hello") + case "Infof": + logger.Infof("hello, %s", "world") + case "Warn": + logger.Warn("hello") + case "Warnf": + logger.Warnf("hello, %s", "world") + case "Error": + logger.Error("hello") + case "Errorf": + logger.Errorf("hello, %s", "world") + default: + t.Fatalf("unexpected op: %q", tc.op) + } + + if buf.String() != "" { + t.Errorf("logger.%s outputted log message when level is set to %v", tc.op, tc.level) + } + } +} diff --git a/server.go b/server.go index 86ff04f..5580606 100644 --- a/server.go +++ b/server.go @@ -112,6 +112,11 @@ type Config struct { // If unset, default logger is used. Logger Logger + // LogLevel specifies the minimum log level to enable. + // + // If unset, DebugLevel is used by default. + LogLevel LogLevel + // ShutdownTimeout specifies the duration to wait to let workers finish their tasks // before forcing them to abort when stopping the server. // @@ -152,6 +157,30 @@ type Logger interface { Fatal(args ...interface{}) } +// LogLevel represents logging level. +type LogLevel int32 + +const ( + // DebugLevel is the lowest level of logging. + // Debug logs are intended for debugging and development purposes. + DebugLevel LogLevel = iota + + // InfoLevel is used for general informational log messages. + InfoLevel + + // WarnLevel is used for undesired but relatively expected events, + // which may indicate a problem. + WarnLevel + + // ErrorLevel is used for undesired and unexpected events that + // the program can recover from. + ErrorLevel + + // FatalLevel is used for undesired and unexpected events that + // the program cannot recover from. + FatalLevel +) + // Formula taken from https://github.com/mperham/sidekiq. func defaultDelayFunc(n int, e error, t *Task) time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -189,6 +218,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if shutdownTimeout == 0 { shutdownTimeout = defaultShutdownTimeout } + logger := log.NewLogger(cfg.Logger) + logger.SetLevel(log.Level(cfg.LogLevel)) host, err := os.Hostname() if err != nil { @@ -197,7 +228,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { pid := os.Getpid() rdb := rdb.NewRDB(createRedisClient(r)) - logger := log.NewLogger(cfg.Logger) ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority) syncCh := make(chan *syncRequest) cancels := base.NewCancelations() From 73d62844e60073678129a09180b5092087dde884 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 9 May 2020 10:59:50 -0700 Subject: [PATCH 3/9] Change LogLevel to satisfy flag.Value interface --- internal/log/log.go | 11 +++++--- internal/log/log_test.go | 54 +++++++++++++++++++++++++++++++++++++++- server.go | 41 +++++++++++++++++++++++++++++- server_test.go | 29 +++++++++++++++++++++ 4 files changed, 129 insertions(+), 6 deletions(-) diff --git a/internal/log/log.go b/internal/log/log.go index d191e6b..64a9962 100644 --- a/internal/log/log.go +++ b/internal/log/log.go @@ -13,7 +13,7 @@ import ( "sync" ) -// Base supports logging with various log levels. +// Base supports logging at various log levels. type Base interface { // Debug logs a message at Debug level. Debug(args ...interface{}) @@ -87,7 +87,7 @@ func NewLogger(base Base) *Logger { return &Logger{base: base, level: DebugLevel} } -// Logger logs message to io.Writer with various log levels. +// Logger logs message to io.Writer at various log levels. type Logger struct { base Base @@ -121,6 +121,9 @@ const ( FatalLevel ) +// String is part of the fmt.Stringer interface. +// +// Used for testing and debugging purposes. func (l Level) String() string { switch l { case DebugLevel: @@ -167,14 +170,14 @@ func (l *Logger) Warn(args ...interface{}) { } func (l *Logger) Error(args ...interface{}) { - if !l.canLogAt(WarnLevel) { + if !l.canLogAt(ErrorLevel) { return } l.base.Error(args...) } func (l *Logger) Fatal(args ...interface{}) { - if !l.canLogAt(WarnLevel) { + if !l.canLogAt(FatalLevel) { return } l.base.Fatal(args...) diff --git a/internal/log/log_test.go b/internal/log/log_test.go index 925cfce..ac9fdf1 100644 --- a/internal/log/log_test.go +++ b/internal/log/log_test.go @@ -284,7 +284,9 @@ func TestLoggerErrorf(t *testing.T) { } } -func TestLoggerWithMinLevels(t *testing.T) { +func TestLoggerWithLowerLevels(t *testing.T) { + // Logger should not log messages at a level + // lower than the specified level. tests := []struct { level Level op string @@ -334,3 +336,53 @@ func TestLoggerWithMinLevels(t *testing.T) { } } } + +func TestLoggerWithSameOrHigherLevels(t *testing.T) { + // Logger should log messages at a level + // same as or higher than the specified level. + tests := []struct { + level Level + op string + }{ + // same level + {DebugLevel, "Debug"}, + {InfoLevel, "Infof"}, + {WarnLevel, "Warn"}, + {ErrorLevel, "Errorf"}, + // higher level + {DebugLevel, "Info"}, + {InfoLevel, "Warnf"}, + {WarnLevel, "Error"}, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := NewLogger(newBase(&buf)) + logger.SetLevel(tc.level) + + switch tc.op { + case "Debug": + logger.Debug("hello") + case "Debugf": + logger.Debugf("hello, %s", "world") + case "Info": + logger.Info("hello") + case "Infof": + logger.Infof("hello, %s", "world") + case "Warn": + logger.Warn("hello") + case "Warnf": + logger.Warnf("hello, %s", "world") + case "Error": + logger.Error("hello") + case "Errorf": + logger.Errorf("hello, %s", "world") + default: + t.Fatalf("unexpected op: %q", tc.op) + } + + if buf.String() == "" { + t.Errorf("logger.%s did not output log message when level is set to %v", tc.op, tc.level) + } + } +} diff --git a/server.go b/server.go index 5580606..f8fc9f7 100644 --- a/server.go +++ b/server.go @@ -12,6 +12,7 @@ import ( "math/rand" "os" "runtime" + "strings" "sync" "time" @@ -138,7 +139,7 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry fn(task, err, retried, maxRetry) } -// Logger supports logging with various log levels. +// Logger supports logging at various log levels. type Logger interface { // Debug logs a message at Debug level. Debug(args ...interface{}) @@ -158,6 +159,8 @@ type Logger interface { } // LogLevel represents logging level. +// +// It satisfies flag.Value interface. type LogLevel int32 const ( @@ -181,6 +184,42 @@ const ( FatalLevel ) +// String is part of the flag.Value interface. +func (l *LogLevel) String() string { + switch *l { + case DebugLevel: + return "debug" + case InfoLevel: + return "info" + case WarnLevel: + return "warn" + case ErrorLevel: + return "error" + case FatalLevel: + return "fatal" + } + panic(fmt.Sprintf("asynq: unexpected log level: %v", *l)) +} + +// Set is part of the flag.Value interface. +func (l *LogLevel) Set(val string) error { + switch strings.ToLower(val) { + case "debug": + *l = DebugLevel + case "info": + *l = InfoLevel + case "warn", "warning": + *l = WarnLevel + case "error": + *l = ErrorLevel + case "fatal": + *l = FatalLevel + default: + return fmt.Errorf("asynq: unsupported log level %q", val) + } + return nil +} + // Formula taken from https://github.com/mperham/sidekiq. func defaultDelayFunc(n int, e error, t *Task) time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) diff --git a/server_test.go b/server_test.go index 31cccd9..6fb7f28 100644 --- a/server_test.go +++ b/server_test.go @@ -208,3 +208,32 @@ func TestServerWithFlakyBroker(t *testing.T) { srv.Stop() } + +func TestLogLevel(t *testing.T) { + tests := []struct { + flagVal string + want LogLevel + wantStr string + }{ + {"debug", DebugLevel, "debug"}, + {"Info", InfoLevel, "info"}, + {"WARN", WarnLevel, "warn"}, + {"warning", WarnLevel, "warn"}, + {"Error", ErrorLevel, "error"}, + {"fatal", FatalLevel, "fatal"}, + } + + for _, tc := range tests { + level := new(LogLevel) + if err := level.Set(tc.flagVal); err != nil { + t.Fatal(err) + } + if *level != tc.want { + t.Errorf("Set(%q): got %v, want %v", tc.flagVal, level, &tc.want) + continue + } + if got := level.String(); got != tc.wantStr { + t.Errorf("String() returned %q, want %q", got, tc.wantStr) + } + } +} From 711bfa371ff61ca3af0008c57d6f8733ce379afa Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 10 May 2020 07:14:05 -0700 Subject: [PATCH 4/9] Update changelog --- CHANGELOG.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0276616..a538d5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `Logger` interface has changed. Please see the godoc for the new interface. + +### Added + +- `LogLevel` type is added. Server's log level can be specified through `LogLevel` field in `Config`. + ## [0.8.3] - 2020-05-08 ### Added From 0faf97f146d413073540bc3cbdda4c82a3a41d0a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 10 May 2020 16:47:46 -0700 Subject: [PATCH 5/9] Define test flags for package testing Added test flags for - redis address (defaults to "localhost:6379") - redis db number (defaults to 14) - log level (defaults to FATAL) --- asynq_test.go | 27 ++++++++++++++++++++------- server_test.go | 13 +++++++------ 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 3aff019..b68256d 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -5,6 +5,7 @@ package asynq import ( + "flag" "sort" "testing" @@ -14,16 +15,28 @@ import ( "github.com/hibiken/asynq/internal/log" ) -// This file defines test helper functions used by -// other test files. +//============================================================================ +// This file defines helper functions and variables used in other test files. +//============================================================================ -// redis used for package testing. -const ( - redisAddr = "localhost:6379" - redisDB = 14 +// variables used for package testing. +var ( + redisAddr string + redisDB int + + testLogLevel = FatalLevel ) -var testLogger = log.NewLogger(nil) +var testLogger *log.Logger + +func init() { + flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") + flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing") + flag.Var(&testLogLevel, "loglevel", "log level to use in testing") + + testLogger = log.NewLogger(nil) + testLogger.SetLevel(log.Level(testLogLevel)) +} func setup(tb testing.TB) *redis.Client { tb.Helper() diff --git a/server_test.go b/server_test.go index 6fb7f28..82d1c72 100644 --- a/server_test.go +++ b/server_test.go @@ -28,6 +28,7 @@ func TestServer(t *testing.T) { c := NewClient(r) srv := NewServer(r, Config{ Concurrency: 10, + LogLevel: testLogLevel, }) // no-op handler @@ -58,7 +59,7 @@ func TestServerRun(t *testing.T) { ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper") defer goleak.VerifyNoLeaks(t, ignoreOpt) - srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) done := make(chan struct{}) // Make sure server exits when receiving TERM signal. @@ -83,7 +84,7 @@ func TestServerRun(t *testing.T) { } func TestServerErrServerStopped(t *testing.T) { - srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) handler := NewServeMux() if err := srv.Start(handler); err != nil { t.Fatal(err) @@ -96,7 +97,7 @@ func TestServerErrServerStopped(t *testing.T) { } func TestServerErrNilHandler(t *testing.T) { - srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) err := srv.Start(nil) if err == nil { t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error") @@ -105,7 +106,7 @@ func TestServerErrNilHandler(t *testing.T) { } func TestServerErrServerRunning(t *testing.T) { - srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) handler := NewServeMux() if err := srv.Start(handler); err != nil { t.Fatal(err) @@ -126,7 +127,7 @@ func TestServerWithRedisDown(t *testing.T) { }() r := rdb.NewRDB(setup(t)) testBroker := testbroker.NewTestBroker(r) - srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) srv.broker = testBroker srv.scheduler.broker = testBroker srv.heartbeater.broker = testBroker @@ -158,7 +159,7 @@ func TestServerWithFlakyBroker(t *testing.T) { }() r := rdb.NewRDB(setup(t)) testBroker := testbroker.NewTestBroker(r) - srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{}) + srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{LogLevel: testLogLevel}) srv.broker = testBroker srv.scheduler.broker = testBroker srv.heartbeater.broker = testBroker From ae942c93e5fd6a26b64d04f6294bbe8ddea84eb5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 11 May 2020 06:55:04 -0700 Subject: [PATCH 6/9] Change default log level to info --- server.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index f8fc9f7..2105c88 100644 --- a/server.go +++ b/server.go @@ -115,7 +115,7 @@ type Config struct { // LogLevel specifies the minimum log level to enable. // - // If unset, DebugLevel is used by default. + // If unset, InfoLevel is used by default. LogLevel LogLevel // ShutdownTimeout specifies the duration to wait to let workers finish their tasks @@ -164,9 +164,12 @@ type Logger interface { type LogLevel int32 const ( + // Note: reserving value zero to differentiate unspecified case. + level_unspecified LogLevel = iota + // DebugLevel is the lowest level of logging. // Debug logs are intended for debugging and development purposes. - DebugLevel LogLevel = iota + DebugLevel // InfoLevel is used for general informational log messages. InfoLevel @@ -220,6 +223,22 @@ func (l *LogLevel) Set(val string) error { return nil } +func toInternalLogLevel(l LogLevel) log.Level { + switch l { + case DebugLevel: + return log.DebugLevel + case InfoLevel: + return log.InfoLevel + case WarnLevel: + return log.WarnLevel + case ErrorLevel: + return log.ErrorLevel + case FatalLevel: + return log.FatalLevel + } + panic(fmt.Sprintf("asynq: unexpected log level: %v", l)) +} + // Formula taken from https://github.com/mperham/sidekiq. func defaultDelayFunc(n int, e error, t *Task) time.Duration { r := rand.New(rand.NewSource(time.Now().UnixNano())) @@ -258,7 +277,11 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { shutdownTimeout = defaultShutdownTimeout } logger := log.NewLogger(cfg.Logger) - logger.SetLevel(log.Level(cfg.LogLevel)) + loglevel := cfg.LogLevel + if loglevel == level_unspecified { + loglevel = InfoLevel + } + logger.SetLevel(toInternalLogLevel(loglevel)) host, err := os.Hostname() if err != nil { From 0289bc7a106d7cbaa19143434b38a65c3321fad1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 11 May 2020 07:02:26 -0700 Subject: [PATCH 7/9] Clean up log messages Moved development purpose log messages to DEBUG level. --- asynq_test.go | 2 +- heartbeat.go | 4 ++-- processor.go | 4 ++-- scheduler.go | 4 ++-- server.go | 3 +-- subscriber.go | 6 +++--- syncer.go | 4 ++-- 7 files changed, 13 insertions(+), 14 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index b68256d..2cbf774 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -35,7 +35,7 @@ func init() { flag.Var(&testLogLevel, "loglevel", "log level to use in testing") testLogger = log.NewLogger(nil) - testLogger.SetLevel(log.Level(testLogLevel)) + testLogger.SetLevel(toInternalLogLevel(testLogLevel)) } func setup(tb testing.TB) *redis.Client { diff --git a/heartbeat.go b/heartbeat.go index 61f7908..754fb31 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -38,7 +38,7 @@ func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval } func (h *heartbeater) terminate() { - h.logger.Info("Heartbeater shutting down...") + h.logger.Debug("Heartbeater shutting down...") // Signal the heartbeater goroutine to stop. h.done <- struct{}{} } @@ -54,7 +54,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { select { case <-h.done: h.broker.ClearServerState(h.ss) - h.logger.Info("Heartbeater done") + h.logger.Debug("Heartbeater done") return case <-time.After(h.interval): h.beat() diff --git a/processor.go b/processor.go index 092fd37..cc6d26b 100644 --- a/processor.go +++ b/processor.go @@ -106,7 +106,7 @@ func newProcessor(params newProcessorParams) *processor { // It's safe to call this method multiple times. func (p *processor) stop() { p.once.Do(func() { - p.logger.Info("Processor shutting down...") + p.logger.Debug("Processor shutting down...") // Unblock if processor is waiting for sema token. close(p.abort) // Signal the processor goroutine to stop processing tasks @@ -145,7 +145,7 @@ func (p *processor) start(wg *sync.WaitGroup) { for { select { case <-p.done: - p.logger.Info("Processor done") + p.logger.Debug("Processor done") return default: p.exec() diff --git a/scheduler.go b/scheduler.go index aa0a36d..8360135 100644 --- a/scheduler.go +++ b/scheduler.go @@ -41,7 +41,7 @@ func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg } func (s *scheduler) terminate() { - s.logger.Info("Scheduler shutting down...") + s.logger.Debug("Scheduler shutting down...") // Signal the scheduler goroutine to stop polling. s.done <- struct{}{} } @@ -54,7 +54,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) { for { select { case <-s.done: - s.logger.Info("Scheduler done") + s.logger.Debug("Scheduler done") return case <-time.After(s.avgInterval): s.exec() diff --git a/server.go b/server.go index 2105c88..b694da8 100644 --- a/server.go +++ b/server.go @@ -401,7 +401,6 @@ func (srv *Server) Stop() { return } - fmt.Println() // print newline for prettier log. srv.logger.Info("Starting graceful shutdown") // Note: The order of termination is important. // Sender goroutines should be terminated before the receiver goroutines. @@ -417,7 +416,7 @@ func (srv *Server) Stop() { srv.broker.Close() srv.ss.SetStatus(base.StatusStopped) - srv.logger.Info("Bye!") + srv.logger.Info("Exiting") } // Quiet signals the server to stop pulling new tasks off queues. diff --git a/subscriber.go b/subscriber.go index be7a010..2804bfa 100644 --- a/subscriber.go +++ b/subscriber.go @@ -38,7 +38,7 @@ func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations } func (s *subscriber) terminate() { - s.logger.Info("Subscriber shutting down...") + s.logger.Debug("Subscriber shutting down...") // Signal the subscriber goroutine to stop. s.done <- struct{}{} } @@ -60,7 +60,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { case <-time.After(s.retryTimeout): continue case <-s.done: - s.logger.Info("Subscriber done") + s.logger.Debug("Subscriber done") return } } @@ -71,7 +71,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { select { case <-s.done: pubsub.Close() - s.logger.Info("Subscriber done") + s.logger.Debug("Subscriber done") return case msg := <-cancelCh: cancel, ok := s.cancelations.Get(msg.Payload) diff --git a/syncer.go b/syncer.go index 018196d..9ea24fc 100644 --- a/syncer.go +++ b/syncer.go @@ -40,7 +40,7 @@ func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Dura } func (s *syncer) terminate() { - s.logger.Info("Syncer shutting down...") + s.logger.Debug("Syncer shutting down...") // Signal the syncer goroutine to stop. s.done <- struct{}{} } @@ -59,7 +59,7 @@ func (s *syncer) start(wg *sync.WaitGroup) { s.logger.Error(req.errMsg) } } - s.logger.Info("Syncer done") + s.logger.Debug("Syncer done") return case req := <-s.requestsCh: requests = append(requests, req) From 556b2103fe240defd99bd949caedbb85f8c9fd04 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 12 May 2020 21:30:51 -0700 Subject: [PATCH 8/9] Minor code cleanup --- processor.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/processor.go b/processor.go index cc6d26b..2840287 100644 --- a/processor.go +++ b/processor.go @@ -159,17 +159,18 @@ func (p *processor) start(wg *sync.WaitGroup) { func (p *processor) exec() { qnames := p.queues() msg, err := p.broker.Dequeue(qnames...) - if err == rdb.ErrNoProcessableTask { // TODO: Need to decouple this error from rdb to support other brokers + switch { + case err == rdb.ErrNoProcessableTask: // queues are empty, this is a normal behavior. - if len(p.queueConfig) > 1 { + if len(qnames) > 1 { // sleep to avoid slamming redis and let scheduler move tasks into queues. // Note: With multiple queues, we are not using blocking pop operation and // polling queues instead. This adds significant load to redis. time.Sleep(time.Second) } + p.logger.Debug("All queues are empty") return - } - if err != nil { + case err != nil: if p.errLogLimiter.Allow() { p.logger.Errorf("Dequeue error: %v", err) } @@ -186,7 +187,7 @@ func (p *processor) exec() { go func() { defer func() { p.ss.DeleteWorkerStats(msg) - <-p.sema /* release token */ + <-p.sema // release token }() ctx, cancel := createContext(msg) From 210b026b013e38535ed8ca7f4753f82e2c88b6e5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 16 May 2020 07:12:08 -0700 Subject: [PATCH 9/9] Add log messages around Server.Quiet --- server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server.go b/server.go index b694da8..5177458 100644 --- a/server.go +++ b/server.go @@ -422,6 +422,8 @@ func (srv *Server) Stop() { // Quiet signals the server to stop pulling new tasks off queues. // Quiet should be used before stopping the server. func (srv *Server) Quiet() { + srv.logger.Info("Stopping processor") srv.processor.stop() srv.ss.SetStatus(base.StatusQuiet) + srv.logger.Info("Processor stopped") }