2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 23:02:18 +08:00

Allow custom logger to be used in Background

This commit is contained in:
Ken Hibino 2020-03-12 07:31:10 -07:00
parent d664d68fa4
commit 0bc6eba021
8 changed files with 90 additions and 19 deletions

View File

@ -40,7 +40,7 @@ type Background struct {
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup
logger *log.Logger logger Logger
rdb *rdb.RDB rdb *rdb.RDB
scheduler *scheduler scheduler *scheduler
@ -107,6 +107,11 @@ type Config struct {
// //
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
// Logger specifies the logger used by the background instance.
//
// If unset, default logger is used.
Logger Logger
} }
// An ErrorHandler handles errors returned by the task handler. // An ErrorHandler handles errors returned by the task handler.
@ -123,6 +128,25 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry
fn(task, err, retried, maxRetry) fn(task, err, retried, maxRetry)
} }
// Logger implements logging with various log levels.
type Logger interface {
// Debug logs a message at Debug level.
Debug(format string, args ...interface{})
// Info logs a message at Info level.
Info(format string, args ...interface{})
// Warn logs a message at Warning level.
Warn(format string, args ...interface{})
// Error logs a message at Error level.
Error(format string, args ...interface{})
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
Fatal(format string, args ...interface{})
}
// Formula taken from https://github.com/mperham/sidekiq. // Formula taken from https://github.com/mperham/sidekiq.
func defaultDelayFunc(n int, e error, t *Task) time.Duration { func defaultDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
@ -154,6 +178,10 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
if len(queues) == 0 { if len(queues) == 0 {
queues = defaultQueueConfig queues = defaultQueueConfig
} }
logger := cfg.Logger
if logger == nil {
logger = log.NewLogger(os.Stderr)
}
host, err := os.Hostname() host, err := os.Hostname()
if err != nil { if err != nil {
@ -161,7 +189,6 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
} }
pid := os.Getpid() pid := os.Getpid()
logger := log.NewLogger(os.Stderr)
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority) ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
syncCh := make(chan *syncRequest) syncCh := make(chan *syncRequest)
@ -210,7 +237,13 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
func (bg *Background) Run(handler Handler) { func (bg *Background) Run(handler Handler) {
bg.logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) type prefixLogger interface {
SetPrefix(prefix string)
}
// If logger supports setting prefix, then set prefix for log output.
if l, ok := bg.logger.(prefixLogger); ok {
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
}
bg.logger.Info("Starting processing") bg.logger.Info("Starting processing")
bg.start(handler) bg.start(handler)

View File

@ -9,14 +9,13 @@ import (
"time" "time"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
// heartbeater is responsible for writing process info to redis periodically to // heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up. // indicate that the background worker process is up.
type heartbeater struct { type heartbeater struct {
logger *log.Logger logger Logger
rdb *rdb.RDB rdb *rdb.RDB
ps *base.ProcessState ps *base.ProcessState
@ -28,7 +27,7 @@ type heartbeater struct {
interval time.Duration interval time.Duration
} }
func newHeartbeater(l *log.Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater { func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
return &heartbeater{ return &heartbeater{
logger: l, logger: l,
rdb: rdb, rdb: rdb,

View File

@ -8,6 +8,7 @@ package log
import ( import (
"io" "io"
stdlog "log" stdlog "log"
"os"
) )
func NewLogger(out io.Writer) *Logger { func NewLogger(out io.Writer) *Logger {
@ -20,6 +21,11 @@ type Logger struct {
*stdlog.Logger *stdlog.Logger
} }
func (l *Logger) Debug(format string, args ...interface{}) {
format = "DEBUG: " + format
l.Printf(format, args...)
}
func (l *Logger) Info(format string, args ...interface{}) { func (l *Logger) Info(format string, args ...interface{}) {
format = "INFO: " + format format = "INFO: " + format
l.Printf(format, args...) l.Printf(format, args...)
@ -34,3 +40,9 @@ func (l *Logger) Error(format string, args ...interface{}) {
format = "ERROR: " + format format = "ERROR: " + format
l.Printf(format, args...) l.Printf(format, args...)
} }
func (l *Logger) Fatal(format string, args ...interface{}) {
format = "FATAL: " + format
l.Printf(format, args...)
os.Exit(1)
}

View File

@ -24,6 +24,38 @@ type tester struct {
wantPattern string // regexp that log output must match wantPattern string // regexp that log output must match
} }
func TestLoggerDebug(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger.Debug(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerInfo(t *testing.T) { func TestLoggerInfo(t *testing.T) {
tests := []tester{ tests := []tester{
{ {

View File

@ -13,13 +13,12 @@ import (
"time" "time"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
type processor struct { type processor struct {
logger *log.Logger logger Logger
rdb *rdb.RDB rdb *rdb.RDB
ps *base.ProcessState ps *base.ProcessState
@ -63,7 +62,7 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor. // newProcessor constructs a new processor.
func newProcessor(l *log.Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
info := ps.Get() info := ps.Get()
qcfg := normalizeQueueCfg(info.Queues) qcfg := normalizeQueueCfg(info.Queues)

View File

@ -8,12 +8,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
type scheduler struct { type scheduler struct {
logger *log.Logger logger Logger
rdb *rdb.RDB rdb *rdb.RDB
// channel to communicate back to the long running "scheduler" goroutine. // channel to communicate back to the long running "scheduler" goroutine.
@ -26,7 +25,7 @@ type scheduler struct {
qnames []string qnames []string
} }
func newScheduler(l *log.Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { func newScheduler(l Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string var qnames []string
for q := range qcfg { for q := range qcfg {
qnames = append(qnames, q) qnames = append(qnames, q)

View File

@ -8,12 +8,11 @@ import (
"sync" "sync"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
type subscriber struct { type subscriber struct {
logger *log.Logger logger Logger
rdb *rdb.RDB rdb *rdb.RDB
// channel to communicate back to the long running "subscriber" goroutine. // channel to communicate back to the long running "subscriber" goroutine.
@ -23,7 +22,7 @@ type subscriber struct {
cancelations *base.Cancelations cancelations *base.Cancelations
} }
func newSubscriber(l *log.Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { func newSubscriber(l Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
return &subscriber{ return &subscriber{
logger: l, logger: l,
rdb: rdb, rdb: rdb,

View File

@ -7,14 +7,12 @@ package asynq
import ( import (
"sync" "sync"
"time" "time"
"github.com/hibiken/asynq/internal/log"
) )
// syncer is responsible for queuing up failed requests to redis and retry // syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis. // those requests to sync state between the background process and redis.
type syncer struct { type syncer struct {
logger *log.Logger logger Logger
requestsCh <-chan *syncRequest requestsCh <-chan *syncRequest
@ -30,7 +28,7 @@ type syncRequest struct {
errMsg string // error message errMsg string // error message
} }
func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer { func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
return &syncer{ return &syncer{
logger: l, logger: l,
requestsCh: requestsCh, requestsCh: requestsCh,