2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 23:02:18 +08:00

Simplify Logger interface

This commit is contained in:
Ken Hibino 2020-05-05 22:10:11 -07:00
parent 26b78136ba
commit b63476ddc8
9 changed files with 277 additions and 96 deletions

View File

@ -5,7 +5,6 @@
package asynq
import (
"os"
"sort"
"testing"
@ -24,7 +23,7 @@ const (
redisDB = 14
)
var testLogger = log.NewLogger(os.Stderr)
var testLogger = log.NewLogger(nil)
func setup(tb testing.TB) *redis.Client {
tb.Helper()

View File

@ -9,12 +9,13 @@ import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
type heartbeater struct {
logger Logger
logger *log.Logger
broker base.Broker
ss *base.ServerState
@ -26,7 +27,7 @@ type heartbeater struct {
interval time.Duration
}
func newHeartbeater(l Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater {
func newHeartbeater(l *log.Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater {
return &heartbeater{
logger: l,
broker: b,
@ -67,6 +68,6 @@ func (h *heartbeater) beat() {
// and short enough to expire quickly once the process is shut down or killed.
err := h.broker.WriteServerState(h.ss, h.interval*2)
if err != nil {
h.logger.Error("could not write heartbeat data: %v", err)
h.logger.Errorf("could not write heartbeat data: %v", err)
}
}

View File

@ -6,52 +6,106 @@
package log
import (
"fmt"
"io"
stdlog "log"
"os"
)
// NewLogger creates and returns a new instance of Logger.
func NewLogger(out io.Writer) *Logger {
return &Logger{
stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
}
// Base supports logging with various log levels.
type Base interface {
// Debug logs a message at Debug level.
Debug(args ...interface{})
// Info logs a message at Info level.
Info(args ...interface{})
// Warn logs a message at Warning level.
Warn(args ...interface{})
// Error logs a message at Error level.
Error(args ...interface{})
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
Fatal(args ...interface{})
}
// Logger is a wrapper object around log.Logger from the standard library.
// baseLogger is a wrapper object around log.Logger from the standard library.
// It supports logging at various log levels.
type Logger struct {
type baseLogger struct {
*stdlog.Logger
}
// Debug logs a message at Debug level.
func (l *Logger) Debug(format string, args ...interface{}) {
format = "DEBUG: " + format
l.Printf(format, args...)
func (l *baseLogger) Debug(args ...interface{}) {
l.prefixPrint("DEBUG: ", args...)
}
// Info logs a message at Info level.
func (l *Logger) Info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
func (l *baseLogger) Info(args ...interface{}) {
l.prefixPrint("INFO: ", args...)
}
// Warn logs a message at Warning level.
func (l *Logger) Warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
func (l *baseLogger) Warn(args ...interface{}) {
l.prefixPrint("WARN: ", args...)
}
// Error logs a message at Error level.
func (l *Logger) Error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
func (l *baseLogger) Error(args ...interface{}) {
l.prefixPrint("ERROR: ", args...)
}
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
func (l *Logger) Fatal(format string, args ...interface{}) {
format = "FATAL: " + format
l.Printf(format, args...)
func (l *baseLogger) Fatal(args ...interface{}) {
l.prefixPrint("FATAL: ", args...)
os.Exit(1)
}
func (l *baseLogger) prefixPrint(prefix string, args ...interface{}) {
args = append([]interface{}{prefix}, args...)
l.Print(args...)
}
// newBase creates and returns a new instance of baseLogger.
func newBase(out io.Writer) *baseLogger {
prefix := fmt.Sprintf("asynq: pid=%d ", os.Getpid())
return &baseLogger{
stdlog.New(out, prefix, stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
}
}
// NewLogger creates and returns a new instance of Logger.
func NewLogger(base Base) *Logger {
if base == nil {
base = newBase(os.Stderr)
}
return &Logger{base}
}
// Logger logs message to io.Writer with various log levels.
type Logger struct {
Base
}
func (l *Logger) Debugf(format string, args ...interface{}) {
l.Debug(fmt.Sprintf(format, args...))
}
func (l *Logger) Infof(format string, args ...interface{}) {
l.Info(fmt.Sprintf(format, args...))
}
func (l *Logger) Warnf(format string, args ...interface{}) {
l.Warn(fmt.Sprintf(format, args...))
}
func (l *Logger) Errorf(format string, args ...interface{}) {
l.Error(fmt.Sprintf(format, args...))
}
func (l *Logger) Fatalf(format string, args ...interface{}) {
l.Fatal(fmt.Sprintf(format, args...))
}

View File

@ -13,6 +13,7 @@ import (
// regexp for timestamps
const (
rgxPID = `[0-9]+`
rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]`
rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]`
rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]`
@ -29,18 +30,20 @@ func TestLoggerDebug(t *testing.T) {
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger := NewLogger(newBase(&buf))
logger.Debug(tc.message)
@ -50,7 +53,7 @@ func TestLoggerDebug(t *testing.T) {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
t.Errorf("logger.Debug(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
@ -61,18 +64,20 @@ func TestLoggerInfo(t *testing.T) {
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger := NewLogger(newBase(&buf))
logger.Info(tc.message)
@ -82,7 +87,7 @@ func TestLoggerInfo(t *testing.T) {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
t.Errorf("logger.Info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
@ -93,18 +98,20 @@ func TestLoggerWarn(t *testing.T) {
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger := NewLogger(newBase(&buf))
logger.Warn(tc.message)
@ -114,7 +121,7 @@ func TestLoggerWarn(t *testing.T) {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
t.Errorf("logger.Warn(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
@ -125,18 +132,20 @@ func TestLoggerError(t *testing.T) {
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, world!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger := NewLogger(newBase(&buf))
logger.Error(tc.message)
@ -146,8 +155,131 @@ func TestLoggerError(t *testing.T) {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
t.Errorf("logger.Error(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
type formatTester struct {
desc string
format string
args []interface{}
wantPattern string // regexp that log output must match
}
func TestLoggerDebugf(t *testing.T) {
tests := []formatTester{
{
desc: "Formats message with DEBUG prefix",
format: "hello, %s!",
args: []interface{}{"Gopher"},
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s DEBUG: hello, Gopher!\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(newBase(&buf))
logger.Debugf(tc.format, tc.args...)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.Debugf(%q, %v) outputted %q, should match pattern %q",
tc.format, tc.args, got, tc.wantPattern)
}
}
}
func TestLoggerInfof(t *testing.T) {
tests := []formatTester{
{
desc: "Formats message with INFO prefix",
format: "%d,%d,%d",
args: []interface{}{1, 2, 3},
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s INFO: 1,2,3\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(newBase(&buf))
logger.Infof(tc.format, tc.args...)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.Infof(%q, %v) outputted %q, should match pattern %q",
tc.format, tc.args, got, tc.wantPattern)
}
}
}
func TestLoggerWarnf(t *testing.T) {
tests := []formatTester{
{
desc: "Formats message with WARN prefix",
format: "hello, %s",
args: []interface{}{"Gophers"},
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s WARN: hello, Gophers\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(newBase(&buf))
logger.Warnf(tc.format, tc.args...)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.Warnf(%q, %v) outputted %q, should match pattern %q",
tc.format, tc.args, got, tc.wantPattern)
}
}
}
func TestLoggerErrorf(t *testing.T) {
tests := []formatTester{
{
desc: "Formats message with ERROR prefix",
format: "hello, %s",
args: []interface{}{"Gophers"},
wantPattern: fmt.Sprintf("^asynq: pid=%s %s %s%s ERROR: hello, Gophers\n$",
rgxPID, rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(newBase(&buf))
logger.Errorf(tc.format, tc.args...)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.Errorf(%q, %v) outputted %q, should match pattern %q",
tc.format, tc.args, got, tc.wantPattern)
}
}
}

View File

@ -13,12 +13,13 @@ import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"golang.org/x/time/rate"
)
type processor struct {
logger Logger
logger *log.Logger
broker base.Broker
ss *base.ServerState
@ -64,7 +65,7 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
type newProcessorParams struct {
logger Logger
logger *log.Logger
broker base.Broker
ss *base.ServerState
retryDelayFunc retryDelayFunc
@ -170,7 +171,7 @@ func (p *processor) exec() {
}
if err != nil {
if p.errLogLimiter.Allow() {
p.logger.Error("Dequeue error: %v", err)
p.logger.Errorf("Dequeue error: %v", err)
}
return
}
@ -202,7 +203,7 @@ func (p *processor) exec() {
select {
case <-p.quit:
// time is up, quit this worker goroutine.
p.logger.Warn("Quitting worker. task id=%s", msg.ID)
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
return
case resErr := <-resCh:
// Note: One of three things should happen.
@ -231,17 +232,17 @@ func (p *processor) exec() {
func (p *processor) restore() {
n, err := p.broker.RequeueAll()
if err != nil {
p.logger.Error("Could not restore unfinished tasks: %v", err)
p.logger.Errorf("Could not restore unfinished tasks: %v", err)
}
if n > 0 {
p.logger.Info("Restored %d unfinished tasks back to queue", n)
p.logger.Infof("Restored %d unfinished tasks back to queue", n)
}
}
func (p *processor) requeue(msg *base.TaskMessage) {
err := p.broker.Requeue(msg)
if err != nil {
p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err)
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
}
}
@ -249,7 +250,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.broker.Done(msg)
if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Done(msg)
@ -265,7 +266,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
err := p.broker.Retry(msg, retryAt, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Retry(msg, retryAt, e.Error())
@ -276,11 +277,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
}
func (p *processor) kill(msg *base.TaskMessage, e error) {
p.logger.Warn("Retry exhausted for task id=%s", msg.ID)
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Kill(msg, e.Error())

View File

@ -9,10 +9,11 @@ import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
type scheduler struct {
logger Logger
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "scheduler" goroutine.
@ -25,7 +26,7 @@ type scheduler struct {
qnames []string
}
func newScheduler(l Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler {
func newScheduler(l *log.Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string
for q := range qcfg {
qnames = append(qnames, q)
@ -64,6 +65,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
func (s *scheduler) exec() {
if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil {
s.logger.Error("Could not enqueue scheduled tasks: %v", err)
s.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
}
}

View File

@ -35,7 +35,7 @@ import (
type Server struct {
ss *base.ServerState
logger Logger
logger *log.Logger
broker base.Broker
@ -133,23 +133,23 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry
fn(task, err, retried, maxRetry)
}
// Logger implements logging with various log levels.
// Logger supports logging with various log levels.
type Logger interface {
// Debug logs a message at Debug level.
Debug(format string, args ...interface{})
Debug(args ...interface{})
// Info logs a message at Info level.
Info(format string, args ...interface{})
Info(args ...interface{})
// Warn logs a message at Warning level.
Warn(format string, args ...interface{})
Warn(args ...interface{})
// Error logs a message at Error level.
Error(format string, args ...interface{})
Error(args ...interface{})
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
Fatal(format string, args ...interface{})
Fatal(args ...interface{})
}
// Formula taken from https://github.com/mperham/sidekiq.
@ -185,10 +185,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if len(queues) == 0 {
queues = defaultQueueConfig
}
logger := cfg.Logger
if logger == nil {
logger = log.NewLogger(os.Stderr)
}
shutdownTimeout := cfg.ShutdownTimeout
if shutdownTimeout == 0 {
shutdownTimeout = defaultShutdownTimeout
@ -201,6 +197,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
pid := os.Getpid()
rdb := rdb.NewRDB(createRedisClient(r))
logger := log.NewLogger(cfg.Logger)
ss := base.NewServerState(host, pid, n, queues, cfg.StrictPriority)
syncCh := make(chan *syncRequest)
cancels := base.NewCancelations()
@ -291,13 +288,6 @@ func (srv *Server) Start(handler Handler) error {
srv.ss.SetStatus(base.StatusRunning)
srv.processor.handler = handler
type prefixLogger interface {
SetPrefix(prefix string)
}
// If logger supports setting prefix, then set prefix for log output.
if l, ok := srv.logger.(prefixLogger); ok {
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
}
srv.logger.Info("Starting processing")
srv.heartbeater.start(&srv.wg)

View File

@ -10,10 +10,11 @@ import (
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
type subscriber struct {
logger Logger
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "subscriber" goroutine.
@ -26,7 +27,7 @@ type subscriber struct {
retryTimeout time.Duration
}
func newSubscriber(l Logger, b base.Broker, cancelations *base.Cancelations) *subscriber {
func newSubscriber(l *log.Logger, b base.Broker, cancelations *base.Cancelations) *subscriber {
return &subscriber{
logger: l,
broker: b,
@ -54,7 +55,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
for {
pubsub, err = s.broker.CancelationPubSub()
if err != nil {
s.logger.Error("cannot subscribe to cancelation channel: %v", err)
s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
select {
case <-time.After(s.retryTimeout):
continue

View File

@ -7,12 +7,14 @@ package asynq
import (
"sync"
"time"
"github.com/hibiken/asynq/internal/log"
)
// syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis.
type syncer struct {
logger Logger
logger *log.Logger
requestsCh <-chan *syncRequest
@ -28,7 +30,7 @@ type syncRequest struct {
errMsg string // error message
}
func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
return &syncer{
logger: l,
requestsCh: requestsCh,