diff --git a/background.go b/background.go index d87044f..39fddcd 100644 --- a/background.go +++ b/background.go @@ -37,6 +37,7 @@ type Background struct { rdb *rdb.RDB scheduler *scheduler processor *processor + syncer *syncer } // Config specifies the background-task processing behavior. @@ -109,13 +110,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { } qcfg := normalizeQueueCfg(queues) + syncRequestCh := make(chan *syncRequest) + + syncer := newSyncer(syncRequestCh, 5*time.Second) + rdb := rdb.NewRDB(createRedisClient(r)) scheduler := newScheduler(rdb, 5*time.Second, qcfg) - processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc) + processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh) return &Background{ rdb: rdb, scheduler: scheduler, processor: processor, + syncer: syncer, } } @@ -175,6 +181,7 @@ func (bg *Background) start(handler Handler) { bg.running = true bg.processor.handler = handler + bg.syncer.start() bg.scheduler.start() bg.processor.start() } @@ -189,6 +196,7 @@ func (bg *Background) stop() { bg.scheduler.terminate() bg.processor.terminate() + bg.syncer.terminate() bg.rdb.Close() bg.processor.handler = nil diff --git a/processor.go b/processor.go index b9626ee..a8921b5 100644 --- a/processor.go +++ b/processor.go @@ -28,6 +28,9 @@ type processor struct { retryDelayFunc retryDelayFunc + // channel via which to send sync requests to syncer. + syncRequestCh chan<- *syncRequest + // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} @@ -53,7 +56,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration // qfcg is a mapping of queue names to associated priority level. // strict specifies whether queue priority should be treated strictly. // fn is a function to compute retry delay. -func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc) *processor { +func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { orderedQueues := []string(nil) if strict { orderedQueues = sortByPriority(qcfg) @@ -63,6 +66,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry queueConfig: qcfg, orderedQueues: orderedQueues, retryDelayFunc: fn, + syncRequestCh: syncRequestCh, sema: make(chan struct{}, n), done: make(chan struct{}), abort: make(chan struct{}), @@ -198,7 +202,14 @@ func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.rdb.Done(msg) if err != nil { - log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) + errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue) + log.Printf("[WARN] %s; will retry\n", errMsg) + p.syncRequestCh <- &syncRequest{ + fn: func() error { + return p.rdb.Done(msg) + }, + errMsg: errMsg, + } } } @@ -207,7 +218,14 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { retryAt := time.Now().Add(d) err := p.rdb.Retry(msg, retryAt, e.Error()) if err != nil { - log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) + errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue) + log.Printf("[WARN] %s; will retry\n", errMsg) + p.syncRequestCh <- &syncRequest{ + fn: func() error { + return p.rdb.Retry(msg, retryAt, e.Error()) + }, + errMsg: errMsg, + } } } @@ -215,7 +233,14 @@ func (p *processor) kill(msg *base.TaskMessage, e error) { log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) err := p.rdb.Kill(msg, e.Error()) if err != nil { - log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) + errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue) + log.Printf("[WARN] %s; will retry\n", errMsg) + p.syncRequestCh <- &syncRequest{ + fn: func() error { + return p.rdb.Kill(msg, e.Error()) + }, + errMsg: errMsg, + } } } diff --git a/processor_test.go b/processor_test.go index d45d3c9..07b39e8 100644 --- a/processor_test.go +++ b/processor_test.go @@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil) p.handler = HandlerFunc(handler) p.start() @@ -148,7 +148,7 @@ func TestProcessorRetry(t *testing.T) { handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil) p.handler = HandlerFunc(handler) p.start() @@ -207,7 +207,7 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc) + p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -273,7 +273,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { "low": 1, } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. - p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc) + p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil) p.handler = HandlerFunc(handler) p.start() diff --git a/syncer.go b/syncer.go new file mode 100644 index 0000000..89cba7d --- /dev/null +++ b/syncer.go @@ -0,0 +1,69 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "log" + "time" +) + +// syncer is responsible for queuing up failed requests to redis and retry +// those requests to sync state between the background process and redis. +type syncer struct { + requestsCh <-chan *syncRequest + + // channel to communicate back to the long running "syncer" goroutine. + done chan struct{} + + // interval between sync operations. + interval time.Duration +} + +type syncRequest struct { + fn func() error // sync operation + errMsg string // error message +} + +func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { + return &syncer{ + requestsCh: requestsCh, + done: make(chan struct{}), + } +} + +func (s *syncer) terminate() { + log.Println("[INFO] Syncer shutting down...") + // Signal the syncer goroutine to stop. + s.done <- struct{}{} +} + +func (s *syncer) start() { + go func() { + var requests []*syncRequest + for { + select { + case <-s.done: + // Try sync one last time before shutting down. + for _, req := range requests { + if err := req.fn(); err != nil { + log.Printf("[ERROR] %s\n", req.errMsg) + } + } + log.Println("[INFO] Syncer done.") + return + case req := <-s.requestsCh: + requests = append(requests, req) + case <-time.After(s.interval): + var temp []*syncRequest + for _, req := range requests { + if err := req.fn(); err != nil { + temp = append(temp, req) + } + } + requests = temp + } + } + }() +} diff --git a/syncer_test.go b/syncer_test.go new file mode 100644 index 0000000..8ce7240 --- /dev/null +++ b/syncer_test.go @@ -0,0 +1,99 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "testing" + "time" + + "github.com/go-redis/redis/v7" + h "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" +) + +func TestSyncer(t *testing.T) { + inProgress := []*base.TaskMessage{ + h.NewTaskMessage("send_email", nil), + h.NewTaskMessage("reindex", nil), + h.NewTaskMessage("gen_thumbnail", nil), + } + r := setup(t) + rdbClient := rdb.NewRDB(r) + h.SeedInProgressQueue(t, r, inProgress) + + const interval = time.Second + syncRequestCh := make(chan *syncRequest) + syncer := newSyncer(syncRequestCh, interval) + syncer.start() + defer syncer.terminate() + + for _, msg := range inProgress { + m := msg + syncRequestCh <- &syncRequest{ + fn: func() error { + return rdbClient.Done(m) + }, + } + } + + time.Sleep(interval) // ensure that syncer runs at least once + + gotInProgress := h.GetInProgressMessages(t, r) + if l := len(gotInProgress); l != 0 { + t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) + } +} + +func TestSyncerRetry(t *testing.T) { + inProgress := []*base.TaskMessage{ + h.NewTaskMessage("send_email", nil), + h.NewTaskMessage("reindex", nil), + h.NewTaskMessage("gen_thumbnail", nil), + } + goodClient := setup(t) + h.SeedInProgressQueue(t, goodClient, inProgress) + + // Simulate the situation where redis server is down + // by connecting to a wrong port. + badClient := redis.NewClient(&redis.Options{ + Addr: "localhost:6390", + }) + rdbClient := rdb.NewRDB(badClient) + + const interval = time.Second + syncRequestCh := make(chan *syncRequest) + syncer := newSyncer(syncRequestCh, interval) + syncer.start() + defer syncer.terminate() + + for _, msg := range inProgress { + m := msg + syncRequestCh <- &syncRequest{ + fn: func() error { + return rdbClient.Done(m) + }, + } + } + + time.Sleep(interval) // ensure that syncer runs at least once + + // Sanity check to ensure that message was not successfully deleted + // from in-progress list. + gotInProgress := h.GetInProgressMessages(t, goodClient) + if l := len(gotInProgress); l != len(inProgress) { + t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress)) + } + + // simualate failover. + rdbClient = rdb.NewRDB(goodClient) + + time.Sleep(interval) // ensure that syncer runs at least once + + gotInProgress = h.GetInProgressMessages(t, goodClient) + if l := len(gotInProgress); l != 0 { + t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) + } +}