mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 23:02:18 +08:00
Extract out log package
This commit is contained in:
parent
a425f54d23
commit
d664d68fa4
@ -5,12 +5,14 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// This file defines test helper functions used by
|
// This file defines test helper functions used by
|
||||||
@ -22,6 +24,8 @@ const (
|
|||||||
redisDB = 14
|
redisDB = 14
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var testLogger = log.NewLogger(os.Stderr)
|
||||||
|
|
||||||
func setup(tb testing.TB) *redis.Client {
|
func setup(tb testing.TB) *redis.Client {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r := redis.NewClient(&redis.Options{
|
r := redis.NewClient(&redis.Options{
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,6 +40,8 @@ type Background struct {
|
|||||||
// wait group to wait for all goroutines to finish.
|
// wait group to wait for all goroutines to finish.
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
logger *log.Logger
|
||||||
|
|
||||||
rdb *rdb.RDB
|
rdb *rdb.RDB
|
||||||
scheduler *scheduler
|
scheduler *scheduler
|
||||||
processor *processor
|
processor *processor
|
||||||
@ -158,16 +161,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
|||||||
}
|
}
|
||||||
pid := os.Getpid()
|
pid := os.Getpid()
|
||||||
|
|
||||||
|
logger := log.NewLogger(os.Stderr)
|
||||||
rdb := rdb.NewRDB(createRedisClient(r))
|
rdb := rdb.NewRDB(createRedisClient(r))
|
||||||
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
|
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
|
||||||
syncCh := make(chan *syncRequest)
|
syncCh := make(chan *syncRequest)
|
||||||
cancels := base.NewCancelations()
|
cancels := base.NewCancelations()
|
||||||
syncer := newSyncer(syncCh, 5*time.Second)
|
syncer := newSyncer(logger, syncCh, 5*time.Second)
|
||||||
heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
|
heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second)
|
||||||
scheduler := newScheduler(rdb, 5*time.Second, queues)
|
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
|
||||||
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
||||||
subscriber := newSubscriber(rdb, cancels)
|
subscriber := newSubscriber(logger, rdb, cancels)
|
||||||
return &Background{
|
return &Background{
|
||||||
|
logger: logger,
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
scheduler: scheduler,
|
scheduler: scheduler,
|
||||||
@ -205,14 +210,14 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
|
|||||||
// a signal, it gracefully shuts down all pending workers and other
|
// a signal, it gracefully shuts down all pending workers and other
|
||||||
// goroutines to process the tasks.
|
// goroutines to process the tasks.
|
||||||
func (bg *Background) Run(handler Handler) {
|
func (bg *Background) Run(handler Handler) {
|
||||||
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
|
bg.logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
|
||||||
logger.info("Starting processing")
|
bg.logger.Info("Starting processing")
|
||||||
|
|
||||||
bg.start(handler)
|
bg.start(handler)
|
||||||
defer bg.stop()
|
defer bg.stop()
|
||||||
|
|
||||||
logger.info("Send signal TSTP to stop processing new tasks")
|
bg.logger.Info("Send signal TSTP to stop processing new tasks")
|
||||||
logger.info("Send signal TERM or INT to terminate the process")
|
bg.logger.Info("Send signal TERM or INT to terminate the process")
|
||||||
|
|
||||||
// Wait for a signal to terminate.
|
// Wait for a signal to terminate.
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
@ -227,7 +232,7 @@ func (bg *Background) Run(handler Handler) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
logger.info("Starting graceful shutdown")
|
bg.logger.Info("Starting graceful shutdown")
|
||||||
}
|
}
|
||||||
|
|
||||||
// starts the background-task processing.
|
// starts the background-task processing.
|
||||||
@ -271,5 +276,5 @@ func (bg *Background) stop() {
|
|||||||
bg.rdb.Close()
|
bg.rdb.Close()
|
||||||
bg.running = false
|
bg.running = false
|
||||||
|
|
||||||
logger.info("Bye!")
|
bg.logger.Info("Bye!")
|
||||||
}
|
}
|
||||||
|
13
heartbeat.go
13
heartbeat.go
@ -9,13 +9,15 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// heartbeater is responsible for writing process info to redis periodically to
|
// heartbeater is responsible for writing process info to redis periodically to
|
||||||
// indicate that the background worker process is up.
|
// indicate that the background worker process is up.
|
||||||
type heartbeater struct {
|
type heartbeater struct {
|
||||||
rdb *rdb.RDB
|
logger *log.Logger
|
||||||
|
rdb *rdb.RDB
|
||||||
|
|
||||||
ps *base.ProcessState
|
ps *base.ProcessState
|
||||||
|
|
||||||
@ -26,8 +28,9 @@ type heartbeater struct {
|
|||||||
interval time.Duration
|
interval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
|
func newHeartbeater(l *log.Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
|
||||||
return &heartbeater{
|
return &heartbeater{
|
||||||
|
logger: l,
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
@ -36,7 +39,7 @@ func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *heartbeater) terminate() {
|
func (h *heartbeater) terminate() {
|
||||||
logger.info("Heartbeater shutting down...")
|
h.logger.Info("Heartbeater shutting down...")
|
||||||
// Signal the heartbeater goroutine to stop.
|
// Signal the heartbeater goroutine to stop.
|
||||||
h.done <- struct{}{}
|
h.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@ -52,7 +55,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
|
|||||||
select {
|
select {
|
||||||
case <-h.done:
|
case <-h.done:
|
||||||
h.rdb.ClearProcessState(h.ps)
|
h.rdb.ClearProcessState(h.ps)
|
||||||
logger.info("Heartbeater done")
|
h.logger.Info("Heartbeater done")
|
||||||
return
|
return
|
||||||
case <-time.After(h.interval):
|
case <-time.After(h.interval):
|
||||||
h.beat()
|
h.beat()
|
||||||
@ -66,6 +69,6 @@ func (h *heartbeater) beat() {
|
|||||||
// and short enough to expire quickly once the process is shut down or killed.
|
// and short enough to expire quickly once the process is shut down or killed.
|
||||||
err := h.rdb.WriteProcessState(h.ps, h.interval*2)
|
err := h.rdb.WriteProcessState(h.ps, h.interval*2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.error("could not write heartbeat data: %v", err)
|
h.logger.Error("could not write heartbeat data: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
|
|
||||||
state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
||||||
hb := newHeartbeater(rdbClient, state, tc.interval)
|
hb := newHeartbeater(testLogger, rdbClient, state, tc.interval)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
hb.start(&wg)
|
hb.start(&wg)
|
||||||
|
36
internal/log/log.go
Normal file
36
internal/log/log.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// Package log exports logging related types and functions.
|
||||||
|
package log
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
stdlog "log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewLogger(out io.Writer) *Logger {
|
||||||
|
return &Logger{
|
||||||
|
stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Logger struct {
|
||||||
|
*stdlog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Logger) Info(format string, args ...interface{}) {
|
||||||
|
format = "INFO: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Logger) Warn(format string, args ...interface{}) {
|
||||||
|
format = "WARN: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Logger) Error(format string, args ...interface{}) {
|
||||||
|
format = "ERROR: " + format
|
||||||
|
l.Printf(format, args...)
|
||||||
|
}
|
@ -1,4 +1,8 @@
|
|||||||
package asynq
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package log
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -36,9 +40,9 @@ func TestLoggerInfo(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
logger := newLogger(&buf)
|
logger := NewLogger(&buf)
|
||||||
|
|
||||||
logger.info(tc.message)
|
logger.Info(tc.message)
|
||||||
|
|
||||||
got := buf.String()
|
got := buf.String()
|
||||||
matched, err := regexp.MatchString(tc.wantPattern, got)
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
||||||
@ -68,9 +72,9 @@ func TestLoggerWarn(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
logger := newLogger(&buf)
|
logger := NewLogger(&buf)
|
||||||
|
|
||||||
logger.warn(tc.message)
|
logger.Warn(tc.message)
|
||||||
|
|
||||||
got := buf.String()
|
got := buf.String()
|
||||||
matched, err := regexp.MatchString(tc.wantPattern, got)
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
||||||
@ -100,9 +104,9 @@ func TestLoggerError(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
logger := newLogger(&buf)
|
logger := NewLogger(&buf)
|
||||||
|
|
||||||
logger.error(tc.message)
|
logger.Error(tc.message)
|
||||||
|
|
||||||
got := buf.String()
|
got := buf.String()
|
||||||
matched, err := regexp.MatchString(tc.wantPattern, got)
|
matched, err := regexp.MatchString(tc.wantPattern, got)
|
35
logger.go
35
logger.go
@ -1,35 +0,0 @@
|
|||||||
package asynq
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
// global logger used in asynq package.
|
|
||||||
var logger = newLogger(os.Stderr)
|
|
||||||
|
|
||||||
func newLogger(out io.Writer) *asynqLogger {
|
|
||||||
return &asynqLogger{
|
|
||||||
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type asynqLogger struct {
|
|
||||||
*log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *asynqLogger) info(format string, args ...interface{}) {
|
|
||||||
format = "INFO: " + format
|
|
||||||
l.Printf(format, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *asynqLogger) warn(format string, args ...interface{}) {
|
|
||||||
format = "WARN: " + format
|
|
||||||
l.Printf(format, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *asynqLogger) error(format string, args ...interface{}) {
|
|
||||||
format = "ERROR: " + format
|
|
||||||
l.Printf(format, args...)
|
|
||||||
}
|
|
33
processor.go
33
processor.go
@ -13,12 +13,14 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
type processor struct {
|
type processor struct {
|
||||||
rdb *rdb.RDB
|
logger *log.Logger
|
||||||
|
rdb *rdb.RDB
|
||||||
|
|
||||||
ps *base.ProcessState
|
ps *base.ProcessState
|
||||||
|
|
||||||
@ -61,7 +63,7 @@ type processor struct {
|
|||||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||||
|
|
||||||
// newProcessor constructs a new processor.
|
// newProcessor constructs a new processor.
|
||||||
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
func newProcessor(l *log.Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
||||||
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
||||||
info := ps.Get()
|
info := ps.Get()
|
||||||
qcfg := normalizeQueueCfg(info.Queues)
|
qcfg := normalizeQueueCfg(info.Queues)
|
||||||
@ -70,6 +72,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
|||||||
orderedQueues = sortByPriority(qcfg)
|
orderedQueues = sortByPriority(qcfg)
|
||||||
}
|
}
|
||||||
return &processor{
|
return &processor{
|
||||||
|
logger: l,
|
||||||
rdb: r,
|
rdb: r,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
queueConfig: qcfg,
|
queueConfig: qcfg,
|
||||||
@ -91,7 +94,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
|
|||||||
// It's safe to call this method multiple times.
|
// It's safe to call this method multiple times.
|
||||||
func (p *processor) stop() {
|
func (p *processor) stop() {
|
||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
logger.info("Processor shutting down...")
|
p.logger.Info("Processor shutting down...")
|
||||||
// Unblock if processor is waiting for sema token.
|
// Unblock if processor is waiting for sema token.
|
||||||
close(p.abort)
|
close(p.abort)
|
||||||
// Signal the processor goroutine to stop processing tasks
|
// Signal the processor goroutine to stop processing tasks
|
||||||
@ -107,7 +110,7 @@ func (p *processor) terminate() {
|
|||||||
// IDEA: Allow user to customize this timeout value.
|
// IDEA: Allow user to customize this timeout value.
|
||||||
const timeout = 8 * time.Second
|
const timeout = 8 * time.Second
|
||||||
time.AfterFunc(timeout, func() { close(p.quit) })
|
time.AfterFunc(timeout, func() { close(p.quit) })
|
||||||
logger.info("Waiting for all workers to finish...")
|
p.logger.Info("Waiting for all workers to finish...")
|
||||||
|
|
||||||
// send cancellation signal to all in-progress task handlers
|
// send cancellation signal to all in-progress task handlers
|
||||||
for _, cancel := range p.cancelations.GetAll() {
|
for _, cancel := range p.cancelations.GetAll() {
|
||||||
@ -118,7 +121,7 @@ func (p *processor) terminate() {
|
|||||||
for i := 0; i < cap(p.sema); i++ {
|
for i := 0; i < cap(p.sema); i++ {
|
||||||
p.sema <- struct{}{}
|
p.sema <- struct{}{}
|
||||||
}
|
}
|
||||||
logger.info("All workers have finished")
|
p.logger.Info("All workers have finished")
|
||||||
p.restore() // move any unfinished tasks back to the queue.
|
p.restore() // move any unfinished tasks back to the queue.
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -132,7 +135,7 @@ func (p *processor) start(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-p.done:
|
case <-p.done:
|
||||||
logger.info("Processor done")
|
p.logger.Info("Processor done")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
p.exec()
|
p.exec()
|
||||||
@ -158,7 +161,7 @@ func (p *processor) exec() {
|
|||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if p.errLogLimiter.Allow() {
|
if p.errLogLimiter.Allow() {
|
||||||
logger.error("Dequeue error: %v", err)
|
p.logger.Error("Dequeue error: %v", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -188,7 +191,7 @@ func (p *processor) exec() {
|
|||||||
select {
|
select {
|
||||||
case <-p.quit:
|
case <-p.quit:
|
||||||
// time is up, quit this worker goroutine.
|
// time is up, quit this worker goroutine.
|
||||||
logger.warn("Quitting worker. task id=%s", msg.ID)
|
p.logger.Warn("Quitting worker. task id=%s", msg.ID)
|
||||||
return
|
return
|
||||||
case resErr := <-resCh:
|
case resErr := <-resCh:
|
||||||
// Note: One of three things should happen.
|
// Note: One of three things should happen.
|
||||||
@ -217,17 +220,17 @@ func (p *processor) exec() {
|
|||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
n, err := p.rdb.RequeueAll()
|
n, err := p.rdb.RequeueAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.error("Could not restore unfinished tasks: %v", err)
|
p.logger.Error("Could not restore unfinished tasks: %v", err)
|
||||||
}
|
}
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
logger.info("Restored %d unfinished tasks back to queue", n)
|
p.logger.Info("Restored %d unfinished tasks back to queue", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) requeue(msg *base.TaskMessage) {
|
func (p *processor) requeue(msg *base.TaskMessage) {
|
||||||
err := p.rdb.Requeue(msg)
|
err := p.rdb.Requeue(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
|
p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,7 +238,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) {
|
|||||||
err := p.rdb.Done(msg)
|
err := p.rdb.Done(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
|
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
|
||||||
logger.warn("%s; Will retry syncing", errMsg)
|
p.logger.Warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Done(msg)
|
return p.rdb.Done(msg)
|
||||||
@ -251,7 +254,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
|
|||||||
err := p.rdb.Retry(msg, retryAt, e.Error())
|
err := p.rdb.Retry(msg, retryAt, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
||||||
logger.warn("%s; Will retry syncing", errMsg)
|
p.logger.Warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Retry(msg, retryAt, e.Error())
|
return p.rdb.Retry(msg, retryAt, e.Error())
|
||||||
@ -262,11 +265,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) kill(msg *base.TaskMessage, e error) {
|
func (p *processor) kill(msg *base.TaskMessage, e error) {
|
||||||
logger.warn("Retry exhausted for task id=%s", msg.ID)
|
p.logger.Warn("Retry exhausted for task id=%s", msg.ID)
|
||||||
err := p.rdb.Kill(msg, e.Error())
|
err := p.rdb.Kill(msg, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
||||||
logger.warn("%s; Will retry syncing", errMsg)
|
p.logger.Warn("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.rdb.Kill(msg, e.Error())
|
return p.rdb.Kill(msg, e.Error())
|
||||||
|
@ -69,7 +69,7 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||||
p.handler = HandlerFunc(handler)
|
p.handler = HandlerFunc(handler)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -167,7 +167,7 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
|
p := newProcessor(testLogger, rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
|
||||||
p.handler = tc.handler
|
p.handler = tc.handler
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -233,7 +233,7 @@ func TestProcessorQueues(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
|
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
|
||||||
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, nil, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||||
got := p.queues()
|
got := p.queues()
|
||||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||||
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
||||||
@ -301,7 +301,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
|||||||
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
||||||
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
||||||
p.handler = HandlerFunc(handler)
|
p.handler = HandlerFunc(handler)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
13
scheduler.go
13
scheduler.go
@ -8,11 +8,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type scheduler struct {
|
type scheduler struct {
|
||||||
rdb *rdb.RDB
|
logger *log.Logger
|
||||||
|
rdb *rdb.RDB
|
||||||
|
|
||||||
// channel to communicate back to the long running "scheduler" goroutine.
|
// channel to communicate back to the long running "scheduler" goroutine.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
@ -24,12 +26,13 @@ type scheduler struct {
|
|||||||
qnames []string
|
qnames []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
|
func newScheduler(l *log.Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
|
||||||
var qnames []string
|
var qnames []string
|
||||||
for q := range qcfg {
|
for q := range qcfg {
|
||||||
qnames = append(qnames, q)
|
qnames = append(qnames, q)
|
||||||
}
|
}
|
||||||
return &scheduler{
|
return &scheduler{
|
||||||
|
logger: l,
|
||||||
rdb: r,
|
rdb: r,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
avgInterval: avgInterval,
|
avgInterval: avgInterval,
|
||||||
@ -38,7 +41,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *scheduler) terminate() {
|
func (s *scheduler) terminate() {
|
||||||
logger.info("Scheduler shutting down...")
|
s.logger.Info("Scheduler shutting down...")
|
||||||
// Signal the scheduler goroutine to stop polling.
|
// Signal the scheduler goroutine to stop polling.
|
||||||
s.done <- struct{}{}
|
s.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@ -51,7 +54,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
logger.info("Scheduler done")
|
s.logger.Info("Scheduler done")
|
||||||
return
|
return
|
||||||
case <-time.After(s.avgInterval):
|
case <-time.After(s.avgInterval):
|
||||||
s.exec()
|
s.exec()
|
||||||
@ -62,6 +65,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
func (s *scheduler) exec() {
|
func (s *scheduler) exec() {
|
||||||
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
|
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
|
||||||
logger.error("Could not enqueue scheduled tasks: %v", err)
|
s.logger.Error("Could not enqueue scheduled tasks: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,7 +19,7 @@ func TestScheduler(t *testing.T) {
|
|||||||
r := setup(t)
|
r := setup(t)
|
||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
const pollInterval = time.Second
|
const pollInterval = time.Second
|
||||||
s := newScheduler(rdbClient, pollInterval, defaultQueueConfig)
|
s := newScheduler(testLogger, rdbClient, pollInterval, defaultQueueConfig)
|
||||||
t1 := h.NewTaskMessage("gen_thumbnail", nil)
|
t1 := h.NewTaskMessage("gen_thumbnail", nil)
|
||||||
t2 := h.NewTaskMessage("send_email", nil)
|
t2 := h.NewTaskMessage("send_email", nil)
|
||||||
t3 := h.NewTaskMessage("reindex", nil)
|
t3 := h.NewTaskMessage("reindex", nil)
|
||||||
|
@ -8,11 +8,13 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type subscriber struct {
|
type subscriber struct {
|
||||||
rdb *rdb.RDB
|
logger *log.Logger
|
||||||
|
rdb *rdb.RDB
|
||||||
|
|
||||||
// channel to communicate back to the long running "subscriber" goroutine.
|
// channel to communicate back to the long running "subscriber" goroutine.
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
@ -21,8 +23,9 @@ type subscriber struct {
|
|||||||
cancelations *base.Cancelations
|
cancelations *base.Cancelations
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
|
func newSubscriber(l *log.Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
|
||||||
return &subscriber{
|
return &subscriber{
|
||||||
|
logger: l,
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
cancelations: cancelations,
|
cancelations: cancelations,
|
||||||
@ -30,7 +33,7 @@ func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *subscriber) terminate() {
|
func (s *subscriber) terminate() {
|
||||||
logger.info("Subscriber shutting down...")
|
s.logger.Info("Subscriber shutting down...")
|
||||||
// Signal the subscriber goroutine to stop.
|
// Signal the subscriber goroutine to stop.
|
||||||
s.done <- struct{}{}
|
s.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@ -39,7 +42,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
|
|||||||
pubsub, err := s.rdb.CancelationPubSub()
|
pubsub, err := s.rdb.CancelationPubSub()
|
||||||
cancelCh := pubsub.Channel()
|
cancelCh := pubsub.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.error("cannot subscribe to cancelation channel: %v", err)
|
s.logger.Error("cannot subscribe to cancelation channel: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@ -49,7 +52,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
|
|||||||
select {
|
select {
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
pubsub.Close()
|
pubsub.Close()
|
||||||
logger.info("Subscriber done")
|
s.logger.Info("Subscriber done")
|
||||||
return
|
return
|
||||||
case msg := <-cancelCh:
|
case msg := <-cancelCh:
|
||||||
cancel, ok := s.cancelations.Get(msg.Payload)
|
cancel, ok := s.cancelations.Get(msg.Payload)
|
||||||
|
@ -37,7 +37,7 @@ func TestSubscriber(t *testing.T) {
|
|||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
cancelations.Add(tc.registeredID, fakeCancelFunc)
|
cancelations.Add(tc.registeredID, fakeCancelFunc)
|
||||||
|
|
||||||
subscriber := newSubscriber(rdbClient, cancelations)
|
subscriber := newSubscriber(testLogger, rdbClient, cancelations)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
subscriber.start(&wg)
|
subscriber.start(&wg)
|
||||||
|
|
||||||
|
13
syncer.go
13
syncer.go
@ -7,11 +7,15 @@ package asynq
|
|||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncer is responsible for queuing up failed requests to redis and retry
|
// syncer is responsible for queuing up failed requests to redis and retry
|
||||||
// those requests to sync state between the background process and redis.
|
// those requests to sync state between the background process and redis.
|
||||||
type syncer struct {
|
type syncer struct {
|
||||||
|
logger *log.Logger
|
||||||
|
|
||||||
requestsCh <-chan *syncRequest
|
requestsCh <-chan *syncRequest
|
||||||
|
|
||||||
// channel to communicate back to the long running "syncer" goroutine.
|
// channel to communicate back to the long running "syncer" goroutine.
|
||||||
@ -26,8 +30,9 @@ type syncRequest struct {
|
|||||||
errMsg string // error message
|
errMsg string // error message
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
func newSyncer(l *log.Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
||||||
return &syncer{
|
return &syncer{
|
||||||
|
logger: l,
|
||||||
requestsCh: requestsCh,
|
requestsCh: requestsCh,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
interval: interval,
|
interval: interval,
|
||||||
@ -35,7 +40,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncer) terminate() {
|
func (s *syncer) terminate() {
|
||||||
logger.info("Syncer shutting down...")
|
s.logger.Info("Syncer shutting down...")
|
||||||
// Signal the syncer goroutine to stop.
|
// Signal the syncer goroutine to stop.
|
||||||
s.done <- struct{}{}
|
s.done <- struct{}{}
|
||||||
}
|
}
|
||||||
@ -51,10 +56,10 @@ func (s *syncer) start(wg *sync.WaitGroup) {
|
|||||||
// Try sync one last time before shutting down.
|
// Try sync one last time before shutting down.
|
||||||
for _, req := range requests {
|
for _, req := range requests {
|
||||||
if err := req.fn(); err != nil {
|
if err := req.fn(); err != nil {
|
||||||
logger.error(req.errMsg)
|
s.logger.Error(req.errMsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.info("Syncer done")
|
s.logger.Info("Syncer done")
|
||||||
return
|
return
|
||||||
case req := <-s.requestsCh:
|
case req := <-s.requestsCh:
|
||||||
requests = append(requests, req)
|
requests = append(requests, req)
|
||||||
|
@ -27,7 +27,7 @@ func TestSyncer(t *testing.T) {
|
|||||||
|
|
||||||
const interval = time.Second
|
const interval = time.Second
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
syncer := newSyncer(syncRequestCh, interval)
|
syncer := newSyncer(testLogger, syncRequestCh, interval)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
syncer.start(&wg)
|
syncer.start(&wg)
|
||||||
defer syncer.terminate()
|
defer syncer.terminate()
|
||||||
@ -52,7 +52,7 @@ func TestSyncer(t *testing.T) {
|
|||||||
func TestSyncerRetry(t *testing.T) {
|
func TestSyncerRetry(t *testing.T) {
|
||||||
const interval = time.Second
|
const interval = time.Second
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
syncer := newSyncer(syncRequestCh, interval)
|
syncer := newSyncer(testLogger, syncRequestCh, interval)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
syncer.start(&wg)
|
syncer.start(&wg)
|
||||||
|
Loading…
Reference in New Issue
Block a user