mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 23:06:12 +08:00 
			
		
		
		
	Add syncer to retry failed redis commands
This commit is contained in:
		| @@ -37,6 +37,7 @@ type Background struct { | |||||||
| 	rdb       *rdb.RDB | 	rdb       *rdb.RDB | ||||||
| 	scheduler *scheduler | 	scheduler *scheduler | ||||||
| 	processor *processor | 	processor *processor | ||||||
|  | 	syncer    *syncer | ||||||
| } | } | ||||||
|  |  | ||||||
| // Config specifies the background-task processing behavior. | // Config specifies the background-task processing behavior. | ||||||
| @@ -109,13 +110,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | |||||||
| 	} | 	} | ||||||
| 	qcfg := normalizeQueueCfg(queues) | 	qcfg := normalizeQueueCfg(queues) | ||||||
|  |  | ||||||
|  | 	syncRequestCh := make(chan *syncRequest) | ||||||
|  |  | ||||||
|  | 	syncer := newSyncer(syncRequestCh, 5*time.Second) | ||||||
|  |  | ||||||
| 	rdb := rdb.NewRDB(createRedisClient(r)) | 	rdb := rdb.NewRDB(createRedisClient(r)) | ||||||
| 	scheduler := newScheduler(rdb, 5*time.Second, qcfg) | 	scheduler := newScheduler(rdb, 5*time.Second, qcfg) | ||||||
| 	processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc) | 	processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh) | ||||||
| 	return &Background{ | 	return &Background{ | ||||||
| 		rdb:       rdb, | 		rdb:       rdb, | ||||||
| 		scheduler: scheduler, | 		scheduler: scheduler, | ||||||
| 		processor: processor, | 		processor: processor, | ||||||
|  | 		syncer:    syncer, | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -175,6 +181,7 @@ func (bg *Background) start(handler Handler) { | |||||||
| 	bg.running = true | 	bg.running = true | ||||||
| 	bg.processor.handler = handler | 	bg.processor.handler = handler | ||||||
|  |  | ||||||
|  | 	bg.syncer.start() | ||||||
| 	bg.scheduler.start() | 	bg.scheduler.start() | ||||||
| 	bg.processor.start() | 	bg.processor.start() | ||||||
| } | } | ||||||
| @@ -189,6 +196,7 @@ func (bg *Background) stop() { | |||||||
|  |  | ||||||
| 	bg.scheduler.terminate() | 	bg.scheduler.terminate() | ||||||
| 	bg.processor.terminate() | 	bg.processor.terminate() | ||||||
|  | 	bg.syncer.terminate() | ||||||
|  |  | ||||||
| 	bg.rdb.Close() | 	bg.rdb.Close() | ||||||
| 	bg.processor.handler = nil | 	bg.processor.handler = nil | ||||||
|   | |||||||
							
								
								
									
										33
									
								
								processor.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								processor.go
									
									
									
									
									
								
							| @@ -28,6 +28,9 @@ type processor struct { | |||||||
|  |  | ||||||
| 	retryDelayFunc retryDelayFunc | 	retryDelayFunc retryDelayFunc | ||||||
|  |  | ||||||
|  | 	// channel via which to send sync requests to syncer. | ||||||
|  | 	syncRequestCh chan<- *syncRequest | ||||||
|  |  | ||||||
| 	// sema is a counting semaphore to ensure the number of active workers | 	// sema is a counting semaphore to ensure the number of active workers | ||||||
| 	// does not exceed the limit. | 	// does not exceed the limit. | ||||||
| 	sema chan struct{} | 	sema chan struct{} | ||||||
| @@ -53,7 +56,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration | |||||||
| // qfcg is a mapping of queue names to associated priority level. | // qfcg is a mapping of queue names to associated priority level. | ||||||
| // strict specifies whether queue priority should be treated strictly. | // strict specifies whether queue priority should be treated strictly. | ||||||
| // fn is a function to compute retry delay. | // fn is a function to compute retry delay. | ||||||
| func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc) *processor { | func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { | ||||||
| 	orderedQueues := []string(nil) | 	orderedQueues := []string(nil) | ||||||
| 	if strict { | 	if strict { | ||||||
| 		orderedQueues = sortByPriority(qcfg) | 		orderedQueues = sortByPriority(qcfg) | ||||||
| @@ -63,6 +66,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry | |||||||
| 		queueConfig:    qcfg, | 		queueConfig:    qcfg, | ||||||
| 		orderedQueues:  orderedQueues, | 		orderedQueues:  orderedQueues, | ||||||
| 		retryDelayFunc: fn, | 		retryDelayFunc: fn, | ||||||
|  | 		syncRequestCh:  syncRequestCh, | ||||||
| 		sema:           make(chan struct{}, n), | 		sema:           make(chan struct{}, n), | ||||||
| 		done:           make(chan struct{}), | 		done:           make(chan struct{}), | ||||||
| 		abort:          make(chan struct{}), | 		abort:          make(chan struct{}), | ||||||
| @@ -198,7 +202,14 @@ func (p *processor) requeue(msg *base.TaskMessage) { | |||||||
| func (p *processor) markAsDone(msg *base.TaskMessage) { | func (p *processor) markAsDone(msg *base.TaskMessage) { | ||||||
| 	err := p.rdb.Done(msg) | 	err := p.rdb.Done(msg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) | 		errMsg := fmt.Sprintf("could not remove task %+v from %q", msg, base.InProgressQueue) | ||||||
|  | 		log.Printf("[WARN] %s; will retry\n", errMsg) | ||||||
|  | 		p.syncRequestCh <- &syncRequest{ | ||||||
|  | 			fn: func() error { | ||||||
|  | 				return p.rdb.Done(msg) | ||||||
|  | 			}, | ||||||
|  | 			errMsg: errMsg, | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -207,7 +218,14 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { | |||||||
| 	retryAt := time.Now().Add(d) | 	retryAt := time.Now().Add(d) | ||||||
| 	err := p.rdb.Retry(msg, retryAt, e.Error()) | 	err := p.rdb.Retry(msg, retryAt, e.Error()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Printf("[ERROR] Could not send task %+v to Retry queue: %v\n", msg, err) | 		errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.RetryQueue) | ||||||
|  | 		log.Printf("[WARN] %s; will retry\n", errMsg) | ||||||
|  | 		p.syncRequestCh <- &syncRequest{ | ||||||
|  | 			fn: func() error { | ||||||
|  | 				return p.rdb.Retry(msg, retryAt, e.Error()) | ||||||
|  | 			}, | ||||||
|  | 			errMsg: errMsg, | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -215,7 +233,14 @@ func (p *processor) kill(msg *base.TaskMessage, e error) { | |||||||
| 	log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) | 	log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) | ||||||
| 	err := p.rdb.Kill(msg, e.Error()) | 	err := p.rdb.Kill(msg, e.Error()) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Printf("[ERROR] Could not send task %+v to Dead queue: %v\n", msg, err) | 		errMsg := fmt.Sprintf("could not move task %+v from %q to %q", msg, base.InProgressQueue, base.DeadQueue) | ||||||
|  | 		log.Printf("[WARN] %s; will retry\n", errMsg) | ||||||
|  | 		p.syncRequestCh <- &syncRequest{ | ||||||
|  | 			fn: func() error { | ||||||
|  | 				return p.rdb.Kill(msg, e.Error()) | ||||||
|  | 			}, | ||||||
|  | 			errMsg: errMsg, | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -65,7 +65,7 @@ func TestProcessorSuccess(t *testing.T) { | |||||||
| 			processed = append(processed, task) | 			processed = append(processed, task) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc) | 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil) | ||||||
| 		p.handler = HandlerFunc(handler) | 		p.handler = HandlerFunc(handler) | ||||||
|  |  | ||||||
| 		p.start() | 		p.start() | ||||||
| @@ -148,7 +148,7 @@ func TestProcessorRetry(t *testing.T) { | |||||||
| 		handler := func(task *Task) error { | 		handler := func(task *Task) error { | ||||||
| 			return fmt.Errorf(errMsg) | 			return fmt.Errorf(errMsg) | ||||||
| 		} | 		} | ||||||
| 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc) | 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil) | ||||||
| 		p.handler = HandlerFunc(handler) | 		p.handler = HandlerFunc(handler) | ||||||
|  |  | ||||||
| 		p.start() | 		p.start() | ||||||
| @@ -207,7 +207,7 @@ func TestProcessorQueues(t *testing.T) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, tc := range tests { | 	for _, tc := range tests { | ||||||
| 		p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc) | 		p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil) | ||||||
| 		got := p.queues() | 		got := p.queues() | ||||||
| 		if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { | 		if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { | ||||||
| 			t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", | 			t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", | ||||||
| @@ -273,7 +273,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { | |||||||
| 			"low":                 1, | 			"low":                 1, | ||||||
| 		} | 		} | ||||||
| 		// Note: Set concurrency to 1 to make sure tasks are processed one at a time. | 		// Note: Set concurrency to 1 to make sure tasks are processed one at a time. | ||||||
| 		p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc) | 		p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil) | ||||||
| 		p.handler = HandlerFunc(handler) | 		p.handler = HandlerFunc(handler) | ||||||
|  |  | ||||||
| 		p.start() | 		p.start() | ||||||
|   | |||||||
							
								
								
									
										69
									
								
								syncer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										69
									
								
								syncer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,69 @@ | |||||||
|  | // Copyright 2020 Kentaro Hibino. All rights reserved. | ||||||
|  | // Use of this source code is governed by a MIT license | ||||||
|  | // that can be found in the LICENSE file. | ||||||
|  |  | ||||||
|  | package asynq | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"log" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // syncer is responsible for queuing up failed requests to redis and retry | ||||||
|  | // those requests to sync state between the background process and redis. | ||||||
|  | type syncer struct { | ||||||
|  | 	requestsCh <-chan *syncRequest | ||||||
|  |  | ||||||
|  | 	// channel to communicate back to the long running "syncer" goroutine. | ||||||
|  | 	done chan struct{} | ||||||
|  |  | ||||||
|  | 	// interval between sync operations. | ||||||
|  | 	interval time.Duration | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type syncRequest struct { | ||||||
|  | 	fn     func() error // sync operation | ||||||
|  | 	errMsg string       // error message | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer { | ||||||
|  | 	return &syncer{ | ||||||
|  | 		requestsCh: requestsCh, | ||||||
|  | 		done:       make(chan struct{}), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *syncer) terminate() { | ||||||
|  | 	log.Println("[INFO] Syncer shutting down...") | ||||||
|  | 	// Signal the syncer goroutine to stop. | ||||||
|  | 	s.done <- struct{}{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *syncer) start() { | ||||||
|  | 	go func() { | ||||||
|  | 		var requests []*syncRequest | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-s.done: | ||||||
|  | 				// Try sync one last time before shutting down. | ||||||
|  | 				for _, req := range requests { | ||||||
|  | 					if err := req.fn(); err != nil { | ||||||
|  | 						log.Printf("[ERROR] %s\n", req.errMsg) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 				log.Println("[INFO] Syncer done.") | ||||||
|  | 				return | ||||||
|  | 			case req := <-s.requestsCh: | ||||||
|  | 				requests = append(requests, req) | ||||||
|  | 			case <-time.After(s.interval): | ||||||
|  | 				var temp []*syncRequest | ||||||
|  | 				for _, req := range requests { | ||||||
|  | 					if err := req.fn(); err != nil { | ||||||
|  | 						temp = append(temp, req) | ||||||
|  | 					} | ||||||
|  | 				} | ||||||
|  | 				requests = temp | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
							
								
								
									
										99
									
								
								syncer_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								syncer_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,99 @@ | |||||||
|  | // Copyright 2020 Kentaro Hibino. All rights reserved. | ||||||
|  | // Use of this source code is governed by a MIT license | ||||||
|  | // that can be found in the LICENSE file. | ||||||
|  |  | ||||||
|  | package asynq | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"testing" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/go-redis/redis/v7" | ||||||
|  | 	h "github.com/hibiken/asynq/internal/asynqtest" | ||||||
|  | 	"github.com/hibiken/asynq/internal/base" | ||||||
|  | 	"github.com/hibiken/asynq/internal/rdb" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func TestSyncer(t *testing.T) { | ||||||
|  | 	inProgress := []*base.TaskMessage{ | ||||||
|  | 		h.NewTaskMessage("send_email", nil), | ||||||
|  | 		h.NewTaskMessage("reindex", nil), | ||||||
|  | 		h.NewTaskMessage("gen_thumbnail", nil), | ||||||
|  | 	} | ||||||
|  | 	r := setup(t) | ||||||
|  | 	rdbClient := rdb.NewRDB(r) | ||||||
|  | 	h.SeedInProgressQueue(t, r, inProgress) | ||||||
|  |  | ||||||
|  | 	const interval = time.Second | ||||||
|  | 	syncRequestCh := make(chan *syncRequest) | ||||||
|  | 	syncer := newSyncer(syncRequestCh, interval) | ||||||
|  | 	syncer.start() | ||||||
|  | 	defer syncer.terminate() | ||||||
|  |  | ||||||
|  | 	for _, msg := range inProgress { | ||||||
|  | 		m := msg | ||||||
|  | 		syncRequestCh <- &syncRequest{ | ||||||
|  | 			fn: func() error { | ||||||
|  | 				return rdbClient.Done(m) | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	time.Sleep(interval) // ensure that syncer runs at least once | ||||||
|  |  | ||||||
|  | 	gotInProgress := h.GetInProgressMessages(t, r) | ||||||
|  | 	if l := len(gotInProgress); l != 0 { | ||||||
|  | 		t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func TestSyncerRetry(t *testing.T) { | ||||||
|  | 	inProgress := []*base.TaskMessage{ | ||||||
|  | 		h.NewTaskMessage("send_email", nil), | ||||||
|  | 		h.NewTaskMessage("reindex", nil), | ||||||
|  | 		h.NewTaskMessage("gen_thumbnail", nil), | ||||||
|  | 	} | ||||||
|  | 	goodClient := setup(t) | ||||||
|  | 	h.SeedInProgressQueue(t, goodClient, inProgress) | ||||||
|  |  | ||||||
|  | 	// Simulate the situation where redis server is down | ||||||
|  | 	// by connecting to a wrong port. | ||||||
|  | 	badClient := redis.NewClient(&redis.Options{ | ||||||
|  | 		Addr: "localhost:6390", | ||||||
|  | 	}) | ||||||
|  | 	rdbClient := rdb.NewRDB(badClient) | ||||||
|  |  | ||||||
|  | 	const interval = time.Second | ||||||
|  | 	syncRequestCh := make(chan *syncRequest) | ||||||
|  | 	syncer := newSyncer(syncRequestCh, interval) | ||||||
|  | 	syncer.start() | ||||||
|  | 	defer syncer.terminate() | ||||||
|  |  | ||||||
|  | 	for _, msg := range inProgress { | ||||||
|  | 		m := msg | ||||||
|  | 		syncRequestCh <- &syncRequest{ | ||||||
|  | 			fn: func() error { | ||||||
|  | 				return rdbClient.Done(m) | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	time.Sleep(interval) // ensure that syncer runs at least once | ||||||
|  |  | ||||||
|  | 	// Sanity check to ensure that message was not successfully deleted | ||||||
|  | 	// from in-progress list. | ||||||
|  | 	gotInProgress := h.GetInProgressMessages(t, goodClient) | ||||||
|  | 	if l := len(gotInProgress); l != len(inProgress) { | ||||||
|  | 		t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// simualate failover. | ||||||
|  | 	rdbClient = rdb.NewRDB(goodClient) | ||||||
|  |  | ||||||
|  | 	time.Sleep(interval) // ensure that syncer runs at least once | ||||||
|  |  | ||||||
|  | 	gotInProgress = h.GetInProgressMessages(t, goodClient) | ||||||
|  | 	if l := len(gotInProgress); l != 0 { | ||||||
|  | 		t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user