2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Add syncer to retry failed redis commands

This commit is contained in:
Ken Hibino 2020-01-18 07:32:06 -08:00
parent 5c806676de
commit c29200b1fc
5 changed files with 210 additions and 9 deletions

View File

@ -37,6 +37,7 @@ type Background struct {
rdb *rdb.RDB rdb *rdb.RDB
scheduler *scheduler scheduler *scheduler
processor *processor processor *processor
syncer *syncer
} }
// Config specifies the background-task processing behavior. // Config specifies the background-task processing behavior.
@ -109,13 +110,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
} }
qcfg := normalizeQueueCfg(queues) qcfg := normalizeQueueCfg(queues)
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, 5*time.Second)
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
scheduler := newScheduler(rdb, 5*time.Second, qcfg) scheduler := newScheduler(rdb, 5*time.Second, qcfg)
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc) processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
return &Background{ return &Background{
rdb: rdb, rdb: rdb,
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
syncer: syncer,
} }
} }
@ -175,6 +181,7 @@ func (bg *Background) start(handler Handler) {
bg.running = true bg.running = true
bg.processor.handler = handler bg.processor.handler = handler
bg.syncer.start()
bg.scheduler.start() bg.scheduler.start()
bg.processor.start() bg.processor.start()
} }
@ -189,6 +196,7 @@ func (bg *Background) stop() {
bg.scheduler.terminate() bg.scheduler.terminate()
bg.processor.terminate() bg.processor.terminate()
bg.syncer.terminate()
bg.rdb.Close() bg.rdb.Close()
bg.processor.handler = nil bg.processor.handler = nil

View File

@ -28,6 +28,9 @@ type processor struct {
retryDelayFunc retryDelayFunc retryDelayFunc retryDelayFunc
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
// sema is a counting semaphore to ensure the number of active workers // sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit. // does not exceed the limit.
sema chan struct{} sema chan struct{}
@ -53,7 +56,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration
// qfcg is a mapping of queue names to associated priority level. // qfcg is a mapping of queue names to associated priority level.
// strict specifies whether queue priority should be treated strictly. // strict specifies whether queue priority should be treated strictly.
// fn is a function to compute retry delay. // fn is a function to compute retry delay.
func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc) *processor { func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
orderedQueues := []string(nil) orderedQueues := []string(nil)
if strict { if strict {
orderedQueues = sortByPriority(qcfg) orderedQueues = sortByPriority(qcfg)
@ -63,6 +66,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry
queueConfig: qcfg, queueConfig: qcfg,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
retryDelayFunc: fn, retryDelayFunc: fn,
syncRequestCh: syncRequestCh,
sema: make(chan struct{}, n), sema: make(chan struct{}, n),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
@ -198,7 +202,14 @@ func (p *processor) requeue(msg *base.TaskMessage) {
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg) err := p.rdb.Done(msg)
if err != nil { if err != nil {
log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Done(msg)
},
errMsg: errMsg,
}
} }
} }
@ -207,7 +218,14 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error()) err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil { if err != nil {
log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error())
},
errMsg: errMsg,
}
} }
} }
@ -215,7 +233,14 @@ func (p *processor) kill(msg *base.TaskMessage, e error) {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
err := p.rdb.Kill(msg, e.Error()) err := p.rdb.Kill(msg, e.Error())
if err != nil { if err != nil {
log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue)
log.Printf("[WARN] %s; will retry\n", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Kill(msg, e.Error())
},
errMsg: errMsg,
}
} }
} }

View File

@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task) processed = append(processed, task)
return nil return nil
} }
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc) p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()
@ -148,7 +148,7 @@ func TestProcessorRetry(t *testing.T) {
handler := func(task *Task) error { handler := func(task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc) p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()
@ -207,7 +207,7 @@ func TestProcessorQueues(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc) p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil)
got := p.queues() got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@ -273,7 +273,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1, "low": 1,
} }
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc) p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.start() p.start()

69
syncer.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"log"
"time"
)
// syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis.
type syncer struct {
requestsCh <-chan *syncRequest
// channel to communicate back to the long running "syncer" goroutine.
done chan struct{}
// interval between sync operations.
interval time.Duration
}
type syncRequest struct {
fn func() error // sync operation
errMsg string // error message
}
func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
return &syncer{
requestsCh: requestsCh,
done: make(chan struct{}),
}
}
func (s *syncer) terminate() {
log.Println("[INFO] Syncer shutting down...")
// Signal the syncer goroutine to stop.
s.done <- struct{}{}
}
func (s *syncer) start() {
go func() {
var requests []*syncRequest
for {
select {
case <-s.done:
// Try sync one last time before shutting down.
for _, req := range requests {
if err := req.fn(); err != nil {
log.Printf("[ERROR] %s\n", req.errMsg)
}
}
log.Println("[INFO] Syncer done.")
return
case req := <-s.requestsCh:
requests = append(requests, req)
case <-time.After(s.interval):
var temp []*syncRequest
for _, req := range requests {
if err := req.fn(); err != nil {
temp = append(temp, req)
}
}
requests = temp
}
}
}()
}

99
syncer_test.go Normal file
View File

@ -0,0 +1,99 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"testing"
"time"
"github.com/go-redis/redis/v7"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestSyncer(t *testing.T) {
inProgress := []*base.TaskMessage{
h.NewTaskMessage("send_email", nil),
h.NewTaskMessage("reindex", nil),
h.NewTaskMessage("gen_thumbnail", nil),
}
r := setup(t)
rdbClient := rdb.NewRDB(r)
h.SeedInProgressQueue(t, r, inProgress)
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer.start()
defer syncer.terminate()
for _, msg := range inProgress {
m := msg
syncRequestCh <- &syncRequest{
fn: func() error {
return rdbClient.Done(m)
},
}
}
time.Sleep(interval) // ensure that syncer runs at least once
gotInProgress := h.GetInProgressMessages(t, r)
if l := len(gotInProgress); l != 0 {
t.Errorf("%q has length %d; want 0", base.InProgressQueue, l)
}
}
func TestSyncerRetry(t *testing.T) {
inProgress := []*base.TaskMessage{
h.NewTaskMessage("send_email", nil),
h.NewTaskMessage("reindex", nil),
h.NewTaskMessage("gen_thumbnail", nil),
}
goodClient := setup(t)
h.SeedInProgressQueue(t, goodClient, inProgress)
// Simulate the situation where redis server is down
// by connecting to a wrong port.
badClient := redis.NewClient(&redis.Options{
Addr: "localhost:6390",
})
rdbClient := rdb.NewRDB(badClient)
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer.start()
defer syncer.terminate()
for _, msg := range inProgress {
m := msg
syncRequestCh <- &syncRequest{
fn: func() error {
return rdbClient.Done(m)
},
}
}
time.Sleep(interval) // ensure that syncer runs at least once
// Sanity check to ensure that message was not successfully deleted
// from in-progress list.
gotInProgress := h.GetInProgressMessages(t, goodClient)
if l := len(gotInProgress); l != len(inProgress) {
t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress))
}
// simualate failover.
rdbClient = rdb.NewRDB(goodClient)
time.Sleep(interval) // ensure that syncer runs at least once
gotInProgress = h.GetInProgressMessages(t, goodClient)
if l := len(gotInProgress); l != 0 {
t.Errorf("%q has length %d; want 0", base.InProgressQueue, l)
}
}