2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Add custom logger

This commit is contained in:
Ken Hibino 2020-01-21 07:17:41 -08:00
parent b02e4e6b09
commit db8e9d05c3
6 changed files with 183 additions and 27 deletions

View File

@ -6,7 +6,6 @@ package asynq
import ( import (
"fmt" "fmt"
"log"
"math" "math"
"math/rand" "math/rand"
"os" "os"
@ -150,9 +149,15 @@ func (fn HandlerFunc) ProcessTask(task *Task) error {
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
func (bg *Background) Run(handler Handler) { func (bg *Background) Run(handler Handler) {
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
logger.info("Starting processing")
bg.start(handler) bg.start(handler)
defer bg.stop() defer bg.stop()
logger.info("Send signal TSTP to stop processing new tasks")
logger.info("Send signal TERM or INT to terminate the process")
// Wait for a signal to terminate. // Wait for a signal to terminate.
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP)
@ -165,7 +170,7 @@ func (bg *Background) Run(handler Handler) {
break break
} }
fmt.Println() fmt.Println()
log.Println("[INFO] Starting graceful shutdown...") logger.info("Starting graceful shutdown")
} }
// starts the background-task processing. // starts the background-task processing.
@ -201,6 +206,8 @@ func (bg *Background) stop() {
bg.rdb.Close() bg.rdb.Close()
bg.processor.handler = nil bg.processor.handler = nil
bg.running = false bg.running = false
logger.info("Bye!")
} }
// normalizeQueueCfg divides priority numbers by their // normalizeQueueCfg divides priority numbers by their

35
logger.go Normal file
View File

@ -0,0 +1,35 @@
package asynq
import (
"io"
"log"
"os"
)
// global logger used in asynq package.
var logger = newLogger(os.Stderr)
func newLogger(out io.Writer) *asynqLogger {
return &asynqLogger{
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
}
}
type asynqLogger struct {
*log.Logger
}
func (l *asynqLogger) info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
}

117
logger_test.go Normal file
View File

@ -0,0 +1,117 @@
package asynq
import (
"bytes"
"fmt"
"regexp"
"testing"
)
// regexp for timestamps
const (
rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]`
rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]`
rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]`
)
type tester struct {
desc string
message string
wantPattern string // regexp that log output must match
}
func TestLoggerInfo(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.info(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerWarn(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.warn(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerError(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger.error(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}

View File

@ -6,7 +6,6 @@ package asynq
import ( import (
"fmt" "fmt"
"log"
"math/rand" "math/rand"
"sort" "sort"
"sync" "sync"
@ -79,7 +78,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
// It's safe to call this method multiple times. // It's safe to call this method multiple times.
func (p *processor) stop() { func (p *processor) stop() {
p.once.Do(func() { p.once.Do(func() {
log.Println("[INFO] Processor shutting down...") logger.info("Processor shutting down...")
// Unblock if processor is waiting for sema token. // Unblock if processor is waiting for sema token.
close(p.abort) close(p.abort)
// Signal the processor goroutine to stop processing tasks // Signal the processor goroutine to stop processing tasks
@ -95,12 +94,12 @@ func (p *processor) terminate() {
// IDEA: Allow user to customize this timeout value. // IDEA: Allow user to customize this timeout value.
const timeout = 8 * time.Second const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) }) time.AfterFunc(timeout, func() { close(p.quit) })
log.Println("[INFO] Waiting for all workers to finish...") logger.info("Waiting for all workers to finish...")
// block until all workers have released the token // block until all workers have released the token
for i := 0; i < cap(p.sema); i++ { for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{} p.sema <- struct{}{}
} }
log.Println("[INFO] All workers have finished.") logger.info("All workers have finished")
p.restore() // move any unfinished tasks back to the queue. p.restore() // move any unfinished tasks back to the queue.
} }
@ -112,7 +111,7 @@ func (p *processor) start() {
for { for {
select { select {
case <-p.done: case <-p.done:
log.Println("[INFO] Processor done.") logger.info("Processor done")
return return
default: default:
p.exec() p.exec()
@ -137,7 +136,7 @@ func (p *processor) exec() {
return return
} }
if err != nil { if err != nil {
log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err) logger.error("Dequeue error: %v", err)
return return
} }
@ -159,7 +158,7 @@ func (p *processor) exec() {
select { select {
case <-p.quit: case <-p.quit:
// time is up, quit this worker goroutine. // time is up, quit this worker goroutine.
log.Printf("[WARN] Terminating in-progress task %+v\n", msg) logger.warn("Quitting worker to process task id=%s", msg.ID)
return return
case resErr := <-resCh: case resErr := <-resCh:
// Note: One of three things should happen. // Note: One of three things should happen.
@ -185,25 +184,25 @@ func (p *processor) exec() {
func (p *processor) restore() { func (p *processor) restore() {
n, err := p.rdb.RestoreUnfinished() n, err := p.rdb.RestoreUnfinished()
if err != nil { if err != nil {
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) logger.error("Could not restore unfinished tasks: %v", err)
} }
if n > 0 { if n > 0 {
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n) logger.info("Restored %d unfinished tasks back to queue", n)
} }
} }
func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg) err := p.rdb.Requeue(msg)
if err != nil { if err != nil {
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err) logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
} }
} }
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg) err := p.rdb.Done(msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue) errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Done(msg) return p.rdb.Done(msg)
@ -218,8 +217,8 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error()) err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error()) return p.rdb.Retry(msg, retryAt, e.Error())
@ -230,11 +229,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
} }
func (p *processor) kill(msg *base.TaskMessage, e error) { func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) logger.warn("Retry exhausted for task id=%s", msg.ID)
err := p.rdb.Kill(msg, e.Error()) err := p.rdb.Kill(msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
log.Printf("[WARN] %s; will retry\n", errMsg) logger.warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Kill(msg, e.Error()) return p.rdb.Kill(msg, e.Error())

View File

@ -5,7 +5,6 @@
package asynq package asynq
import ( import (
"log"
"time" "time"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
@ -38,7 +37,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *
} }
func (s *scheduler) terminate() { func (s *scheduler) terminate() {
log.Println("[INFO] Scheduler shutting down...") logger.info("Scheduler shutting down...")
// Signal the scheduler goroutine to stop polling. // Signal the scheduler goroutine to stop polling.
s.done <- struct{}{} s.done <- struct{}{}
} }
@ -49,7 +48,7 @@ func (s *scheduler) start() {
for { for {
select { select {
case <-s.done: case <-s.done:
log.Println("[INFO] Scheduler done.") logger.info("Scheduler done")
return return
case <-time.After(s.avgInterval): case <-time.After(s.avgInterval):
s.exec() s.exec()
@ -60,6 +59,6 @@ func (s *scheduler) start() {
func (s *scheduler) exec() { func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) logger.error("Could not enqueue scheduled tasks: %v", err)
} }
} }

View File

@ -5,7 +5,6 @@
package asynq package asynq
import ( import (
"log"
"time" "time"
) )
@ -35,7 +34,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
} }
func (s *syncer) terminate() { func (s *syncer) terminate() {
log.Println("[INFO] Syncer shutting down...") logger.info("Syncer shutting down...")
// Signal the syncer goroutine to stop. // Signal the syncer goroutine to stop.
s.done <- struct{}{} s.done <- struct{}{}
} }
@ -49,10 +48,10 @@ func (s *syncer) start() {
// Try sync one last time before shutting down. // Try sync one last time before shutting down.
for _, req := range requests { for _, req := range requests {
if err := req.fn(); err != nil { if err := req.fn(); err != nil {
log.Printf("[ERROR] %s\n", req.errMsg) logger.error(req.errMsg)
} }
} }
log.Println("[INFO] Syncer done.") logger.info("Syncer done")
return return
case req := <-s.requestsCh: case req := <-s.requestsCh:
requests = append(requests, req) requests = append(requests, req)