diff --git a/background.go b/background.go index 249c655..2a01f96 100644 --- a/background.go +++ b/background.go @@ -6,7 +6,6 @@ package asynq import ( "fmt" - "log" "math" "math/rand" "os" @@ -150,9 +149,15 @@ func (fn HandlerFunc) ProcessTask(task *Task) error { // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. func (bg *Background) Run(handler Handler) { + logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) + logger.info("Starting processing") + bg.start(handler) defer bg.stop() + logger.info("Send signal TSTP to stop processing new tasks") + logger.info("Send signal TERM or INT to terminate the process") + // Wait for a signal to terminate. sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT, syscall.SIGTSTP) @@ -165,7 +170,7 @@ func (bg *Background) Run(handler Handler) { break } fmt.Println() - log.Println("[INFO] Starting graceful shutdown...") + logger.info("Starting graceful shutdown") } // starts the background-task processing. @@ -201,6 +206,8 @@ func (bg *Background) stop() { bg.rdb.Close() bg.processor.handler = nil bg.running = false + + logger.info("Bye!") } // normalizeQueueCfg divides priority numbers by their diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..803e3a4 --- /dev/null +++ b/logger.go @@ -0,0 +1,35 @@ +package asynq + +import ( + "io" + "log" + "os" +) + +// global logger used in asynq package. +var logger = newLogger(os.Stderr) + +func newLogger(out io.Writer) *asynqLogger { + return &asynqLogger{ + log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC), + } +} + +type asynqLogger struct { + *log.Logger +} + +func (l *asynqLogger) info(format string, args ...interface{}) { + format = "INFO: " + format + l.Printf(format, args...) +} + +func (l *asynqLogger) warn(format string, args ...interface{}) { + format = "WARN: " + format + l.Printf(format, args...) +} + +func (l *asynqLogger) error(format string, args ...interface{}) { + format = "ERROR: " + format + l.Printf(format, args...) +} diff --git a/logger_test.go b/logger_test.go new file mode 100644 index 0000000..0d9b926 --- /dev/null +++ b/logger_test.go @@ -0,0 +1,117 @@ +package asynq + +import ( + "bytes" + "fmt" + "regexp" + "testing" +) + +// regexp for timestamps +const ( + rgxdate = `[0-9][0-9][0-9][0-9]/[0-9][0-9]/[0-9][0-9]` + rgxtime = `[0-9][0-9]:[0-9][0-9]:[0-9][0-9]` + rgxmicroseconds = `\.[0-9][0-9][0-9][0-9][0-9][0-9]` +) + +type tester struct { + desc string + message string + wantPattern string // regexp that log output must match +} + +func TestLoggerInfo(t *testing.T) { + tests := []tester{ + { + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + { + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^%s %s%s INFO: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := newLogger(&buf) + + logger.info(tc.message) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + tc.message, got, tc.wantPattern) + } + } +} + +func TestLoggerWarn(t *testing.T) { + tests := []tester{ + { + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + { + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^%s %s%s WARN: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := newLogger(&buf) + + logger.warn(tc.message) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + tc.message, got, tc.wantPattern) + } + } +} + +func TestLoggerError(t *testing.T) { + tests := []tester{ + { + desc: "without trailing newline, logger adds newline", + message: "hello, world!", + wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + { + desc: "with trailing newline, logger preserves newline", + message: "hello, world!\n", + wantPattern: fmt.Sprintf("^%s %s%s ERROR: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds), + }, + } + + for _, tc := range tests { + var buf bytes.Buffer + logger := newLogger(&buf) + + logger.error(tc.message) + + got := buf.String() + matched, err := regexp.MatchString(tc.wantPattern, got) + if err != nil { + t.Fatal("pattern did not compile:", err) + } + if !matched { + t.Errorf("logger.info(%q) outputted %q, should match pattern %q", + tc.message, got, tc.wantPattern) + } + } +} diff --git a/processor.go b/processor.go index a8921b5..8212e44 100644 --- a/processor.go +++ b/processor.go @@ -6,7 +6,6 @@ package asynq import ( "fmt" - "log" "math/rand" "sort" "sync" @@ -79,7 +78,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry // It's safe to call this method multiple times. func (p *processor) stop() { p.once.Do(func() { - log.Println("[INFO] Processor shutting down...") + logger.info("Processor shutting down...") // Unblock if processor is waiting for sema token. close(p.abort) // Signal the processor goroutine to stop processing tasks @@ -95,12 +94,12 @@ func (p *processor) terminate() { // IDEA: Allow user to customize this timeout value. const timeout = 8 * time.Second time.AfterFunc(timeout, func() { close(p.quit) }) - log.Println("[INFO] Waiting for all workers to finish...") + logger.info("Waiting for all workers to finish...") // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} } - log.Println("[INFO] All workers have finished.") + logger.info("All workers have finished") p.restore() // move any unfinished tasks back to the queue. } @@ -112,7 +111,7 @@ func (p *processor) start() { for { select { case <-p.done: - log.Println("[INFO] Processor done.") + logger.info("Processor done") return default: p.exec() @@ -137,7 +136,7 @@ func (p *processor) exec() { return } if err != nil { - log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err) + logger.error("Dequeue error: %v", err) return } @@ -159,7 +158,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. - log.Printf("[WARN] Terminating in-progress task %+v\n", msg) + logger.warn("Quitting worker to process task id=%s", msg.ID) return case resErr := <-resCh: // Note: One of three things should happen. @@ -185,25 +184,25 @@ func (p *processor) exec() { func (p *processor) restore() { n, err := p.rdb.RestoreUnfinished() if err != nil { - log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) + logger.error("Could not restore unfinished tasks: %v", err) } if n > 0 { - log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n) + logger.info("Restored %d unfinished tasks back to queue", n) } } func (p *processor) requeue(msg *base.TaskMessage) { err := p.rdb.Requeue(msg) if err != nil { - log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err) + logger.error("Could not push task id=%s back to queue: %v", msg.ID, err) } } func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.rdb.Done(msg) if err != nil { - errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue) - log.Printf("[WARN] %s; will retry\n", errMsg) + errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) + logger.warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Done(msg) @@ -218,8 +217,8 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { retryAt := time.Now().Add(d) err := p.rdb.Retry(msg, retryAt, e.Error()) if err != nil { - errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue) - log.Printf("[WARN] %s; will retry\n", errMsg) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) + logger.warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Retry(msg, retryAt, e.Error()) @@ -230,11 +229,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { } func (p *processor) kill(msg *base.TaskMessage, e error) { - log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) + logger.warn("Retry exhausted for task id=%s", msg.ID) err := p.rdb.Kill(msg, e.Error()) if err != nil { - errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue) - log.Printf("[WARN] %s; will retry\n", errMsg) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) + logger.warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { return p.rdb.Kill(msg, e.Error()) diff --git a/scheduler.go b/scheduler.go index d452637..8c59ebe 100644 --- a/scheduler.go +++ b/scheduler.go @@ -5,7 +5,6 @@ package asynq import ( - "log" "time" "github.com/hibiken/asynq/internal/rdb" @@ -38,7 +37,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) * } func (s *scheduler) terminate() { - log.Println("[INFO] Scheduler shutting down...") + logger.info("Scheduler shutting down...") // Signal the scheduler goroutine to stop polling. s.done <- struct{}{} } @@ -49,7 +48,7 @@ func (s *scheduler) start() { for { select { case <-s.done: - log.Println("[INFO] Scheduler done.") + logger.info("Scheduler done") return case <-time.After(s.avgInterval): s.exec() @@ -60,6 +59,6 @@ func (s *scheduler) start() { func (s *scheduler) exec() { if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { - log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) + logger.error("Could not enqueue scheduled tasks: %v", err) } } diff --git a/syncer.go b/syncer.go index 1ea49f4..2d6c2b0 100644 --- a/syncer.go +++ b/syncer.go @@ -5,7 +5,6 @@ package asynq import ( - "log" "time" ) @@ -35,7 +34,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { } func (s *syncer) terminate() { - log.Println("[INFO] Syncer shutting down...") + logger.info("Syncer shutting down...") // Signal the syncer goroutine to stop. s.done <- struct{}{} } @@ -49,10 +48,10 @@ func (s *syncer) start() { // Try sync one last time before shutting down. for _, req := range requests { if err := req.fn(); err != nil { - log.Printf("[ERROR] %s\n", req.errMsg) + logger.error(req.errMsg) } } - log.Println("[INFO] Syncer done.") + logger.info("Syncer done") return case req := <-s.requestsCh: requests = append(requests, req)