mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 10:56:12 +08:00 
			
		
		
		
	Rename Background to Server
This commit is contained in:
		| @@ -24,7 +24,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { | ||||
| 			DB:   redisDB, | ||||
| 		} | ||||
| 		client := NewClient(redis) | ||||
| 		bg := NewBackground(redis, &Config{ | ||||
| 		srv := NewServer(redis, Config{ | ||||
| 			Concurrency: 10, | ||||
| 			RetryDelayFunc: func(n int, err error, t *Task) time.Duration { | ||||
| 				return time.Second | ||||
| @@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) { | ||||
| 		} | ||||
| 		b.StartTimer() // end setup | ||||
|  | ||||
| 		bg.start(HandlerFunc(handler)) | ||||
| 		srv.start(HandlerFunc(handler)) | ||||
| 		wg.Wait() | ||||
|  | ||||
| 		b.StopTimer() // begin teardown | ||||
| 		bg.stop() | ||||
| 		srv.stop() | ||||
| 		b.StartTimer() // end teardown | ||||
| 	} | ||||
| } | ||||
| @@ -67,7 +67,7 @@ func BenchmarkEndToEnd(b *testing.B) { | ||||
| 			DB:   redisDB, | ||||
| 		} | ||||
| 		client := NewClient(redis) | ||||
| 		bg := NewBackground(redis, &Config{ | ||||
| 		srv := NewServer(redis, Config{ | ||||
| 			Concurrency: 10, | ||||
| 			RetryDelayFunc: func(n int, err error, t *Task) time.Duration { | ||||
| 				return time.Second | ||||
| @@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) { | ||||
| 		} | ||||
| 		b.StartTimer() // end setup | ||||
|  | ||||
| 		bg.start(HandlerFunc(handler)) | ||||
| 		srv.start(HandlerFunc(handler)) | ||||
| 		wg.Wait() | ||||
|  | ||||
| 		b.StopTimer() // begin teardown | ||||
| 		bg.stop() | ||||
| 		srv.stop() | ||||
| 		b.StartTimer() // end teardown | ||||
| 	} | ||||
| } | ||||
| @@ -124,7 +124,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { | ||||
| 			DB:   redisDB, | ||||
| 		} | ||||
| 		client := NewClient(redis) | ||||
| 		bg := NewBackground(redis, &Config{ | ||||
| 		srv := NewServer(redis, Config{ | ||||
| 			Concurrency: 10, | ||||
| 			Queues: map[string]int{ | ||||
| 				"high":    6, | ||||
| @@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { | ||||
| 		} | ||||
| 		b.StartTimer() // end setup | ||||
|  | ||||
| 		bg.start(HandlerFunc(handler)) | ||||
| 		srv.start(HandlerFunc(handler)) | ||||
| 		wg.Wait() | ||||
|  | ||||
| 		b.StopTimer() // begin teardown | ||||
| 		bg.stop() | ||||
| 		srv.stop() | ||||
| 		b.StartTimer() // end teardown | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -18,10 +18,10 @@ import ( | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| ) | ||||
| 
 | ||||
| // Background is responsible for managing the background-task processing. | ||||
| // Server is responsible for managing the background-task processing. | ||||
| // | ||||
| // Background manages task queues to process tasks. | ||||
| // If the processing of a task is unsuccessful, background will | ||||
| // Server pulls tasks off queues and process them. | ||||
| // If the processing of a task is unsuccessful, server will | ||||
| // schedule it for a retry until either the task gets processed successfully | ||||
| // or it exhausts its max retry count. | ||||
| // | ||||
| @@ -29,7 +29,7 @@ import ( | ||||
| // will be kept in the queue for some time until a certain condition is met | ||||
| // (e.g., queue size reaches a certain limit, or the task has been in the | ||||
| // queue for a certain amount of time). | ||||
| type Background struct { | ||||
| type Server struct { | ||||
| 	mu      sync.Mutex | ||||
| 	running bool | ||||
| 
 | ||||
| @@ -48,11 +48,11 @@ type Background struct { | ||||
| 	subscriber  *subscriber | ||||
| } | ||||
| 
 | ||||
| // Config specifies the background-task processing behavior. | ||||
| // Config specifies the server's background-task processing behavior. | ||||
| type Config struct { | ||||
| 	// Maximum number of concurrent processing of tasks. | ||||
| 	// | ||||
| 	// If set to a zero or negative value, NewBackground will overwrite the value to one. | ||||
| 	// If set to a zero or negative value, NewServer will overwrite the value to one. | ||||
| 	Concurrency int | ||||
| 
 | ||||
| 	// Function to calculate retry delay for a failed task. | ||||
| @@ -67,7 +67,7 @@ type Config struct { | ||||
| 	// List of queues to process with given priority value. Keys are the names of the | ||||
| 	// queues and values are associated priority value. | ||||
| 	// | ||||
| 	// If set to nil or not specified, the background will process only the "default" queue. | ||||
| 	// If set to nil or not specified, the server will process only the "default" queue. | ||||
| 	// | ||||
| 	// Priority is treated as follows to avoid starving low priority queues. | ||||
| 	// | ||||
| @@ -106,7 +106,7 @@ type Config struct { | ||||
| 	// ErrorHandler: asynq.ErrorHandlerFunc(reportError) | ||||
| 	ErrorHandler ErrorHandler | ||||
| 
 | ||||
| 	// Logger specifies the logger used by the background instance. | ||||
| 	// Logger specifies the logger used by the server instance. | ||||
| 	// | ||||
| 	// If unset, default logger is used. | ||||
| 	Logger Logger | ||||
| @@ -156,9 +156,9 @@ var defaultQueueConfig = map[string]int{ | ||||
| 	base.DefaultQueueName: 1, | ||||
| } | ||||
| 
 | ||||
| // NewBackground returns a new Background given a redis connection option | ||||
| // NewServer returns a new Server given a redis connection option | ||||
| // and background processing configuration. | ||||
| func NewBackground(r RedisConnOpt, cfg *Config) *Background { | ||||
| func NewServer(r RedisConnOpt, cfg Config) *Server { | ||||
| 	n := cfg.Concurrency | ||||
| 	if n < 1 { | ||||
| 		n = 1 | ||||
| @@ -196,7 +196,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | ||||
| 	scheduler := newScheduler(logger, rdb, 5*time.Second, queues) | ||||
| 	processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) | ||||
| 	subscriber := newSubscriber(logger, rdb, cancels) | ||||
| 	return &Background{ | ||||
| 	return &Server{ | ||||
| 		logger:      logger, | ||||
| 		rdb:         rdb, | ||||
| 		ps:          ps, | ||||
| @@ -234,47 +234,47 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { | ||||
| // an os signal to exit the program is received. Once it receives | ||||
| // a signal, it gracefully shuts down all pending workers and other | ||||
| // goroutines to process the tasks. | ||||
| func (bg *Background) Run(handler Handler) { | ||||
| func (srv *Server) Run(handler Handler) { | ||||
| 	type prefixLogger interface { | ||||
| 		SetPrefix(prefix string) | ||||
| 	} | ||||
| 	// If logger supports setting prefix, then set prefix for log output. | ||||
| 	if l, ok := bg.logger.(prefixLogger); ok { | ||||
| 	if l, ok := srv.logger.(prefixLogger); ok { | ||||
| 		l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) | ||||
| 	} | ||||
| 	bg.logger.Info("Starting processing") | ||||
| 	srv.logger.Info("Starting processing") | ||||
| 
 | ||||
| 	bg.start(handler) | ||||
| 	defer bg.stop() | ||||
| 	srv.start(handler) | ||||
| 	defer srv.stop() | ||||
| 
 | ||||
| 	bg.waitForSignals() | ||||
| 	srv.waitForSignals() | ||||
| 	fmt.Println() | ||||
| 	bg.logger.Info("Starting graceful shutdown") | ||||
| 	srv.logger.Info("Starting graceful shutdown") | ||||
| } | ||||
| 
 | ||||
| // starts the background-task processing. | ||||
| func (bg *Background) start(handler Handler) { | ||||
| 	bg.mu.Lock() | ||||
| 	defer bg.mu.Unlock() | ||||
| 	if bg.running { | ||||
| func (srv *Server) start(handler Handler) { | ||||
| 	srv.mu.Lock() | ||||
| 	defer srv.mu.Unlock() | ||||
| 	if srv.running { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	bg.running = true | ||||
| 	bg.processor.handler = handler | ||||
| 	srv.running = true | ||||
| 	srv.processor.handler = handler | ||||
| 
 | ||||
| 	bg.heartbeater.start(&bg.wg) | ||||
| 	bg.subscriber.start(&bg.wg) | ||||
| 	bg.syncer.start(&bg.wg) | ||||
| 	bg.scheduler.start(&bg.wg) | ||||
| 	bg.processor.start(&bg.wg) | ||||
| 	srv.heartbeater.start(&srv.wg) | ||||
| 	srv.subscriber.start(&srv.wg) | ||||
| 	srv.syncer.start(&srv.wg) | ||||
| 	srv.scheduler.start(&srv.wg) | ||||
| 	srv.processor.start(&srv.wg) | ||||
| } | ||||
| 
 | ||||
| // stops the background-task processing. | ||||
| func (bg *Background) stop() { | ||||
| 	bg.mu.Lock() | ||||
| 	defer bg.mu.Unlock() | ||||
| 	if !bg.running { | ||||
| func (srv *Server) stop() { | ||||
| 	srv.mu.Lock() | ||||
| 	defer srv.mu.Unlock() | ||||
| 	if !srv.running { | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| @@ -282,16 +282,16 @@ func (bg *Background) stop() { | ||||
| 	// Sender goroutines should be terminated before the receiver goroutines. | ||||
| 	// | ||||
| 	// processor -> syncer (via syncCh) | ||||
| 	bg.scheduler.terminate() | ||||
| 	bg.processor.terminate() | ||||
| 	bg.syncer.terminate() | ||||
| 	bg.subscriber.terminate() | ||||
| 	bg.heartbeater.terminate() | ||||
| 	srv.scheduler.terminate() | ||||
| 	srv.processor.terminate() | ||||
| 	srv.syncer.terminate() | ||||
| 	srv.subscriber.terminate() | ||||
| 	srv.heartbeater.terminate() | ||||
| 
 | ||||
| 	bg.wg.Wait() | ||||
| 	srv.wg.Wait() | ||||
| 
 | ||||
| 	bg.rdb.Close() | ||||
| 	bg.running = false | ||||
| 	srv.rdb.Close() | ||||
| 	srv.running = false | ||||
| 
 | ||||
| 	bg.logger.Info("Bye!") | ||||
| 	srv.logger.Info("Bye!") | ||||
| } | ||||
| @@ -13,7 +13,7 @@ import ( | ||||
| 	"go.uber.org/goleak" | ||||
| ) | ||||
| 
 | ||||
| func TestBackground(t *testing.T) { | ||||
| func TestServer(t *testing.T) { | ||||
| 	// https://github.com/go-redis/redis/issues/1029 | ||||
| 	ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper") | ||||
| 	defer goleak.VerifyNoLeaks(t, ignoreOpt) | ||||
| @@ -22,8 +22,8 @@ func TestBackground(t *testing.T) { | ||||
| 		Addr: "localhost:6379", | ||||
| 		DB:   15, | ||||
| 	} | ||||
| 	client := NewClient(r) | ||||
| 	bg := NewBackground(r, &Config{ | ||||
| 	c := NewClient(r) | ||||
| 	srv := NewServer(r, Config{ | ||||
| 		Concurrency: 10, | ||||
| 	}) | ||||
| 
 | ||||
| @@ -32,19 +32,19 @@ func TestBackground(t *testing.T) { | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	bg.start(HandlerFunc(h)) | ||||
| 	srv.start(HandlerFunc(h)) | ||||
| 
 | ||||
| 	err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) | ||||
| 	err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("could not enqueue a task: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) | ||||
| 	err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) | ||||
| 	if err != nil { | ||||
| 		t.Errorf("could not enqueue a task: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	bg.stop() | ||||
| 	srv.stop() | ||||
| } | ||||
| 
 | ||||
| func TestGCD(t *testing.T) { | ||||
| @@ -15,17 +15,17 @@ import ( | ||||
| // It handles SIGTERM, SIGINT, and SIGTSTP. | ||||
| // SIGTERM and SIGINT will signal the process to exit. | ||||
| // SIGTSTP will signal the process to stop processing new tasks. | ||||
| func (bg *Background) waitForSignals() { | ||||
| 	bg.logger.Info("Send signal TSTP to stop processing new tasks") | ||||
| 	bg.logger.Info("Send signal TERM or INT to terminate the process") | ||||
| func (srv *Server) waitForSignals() { | ||||
| 	srv.logger.Info("Send signal TSTP to stop processing new tasks") | ||||
| 	srv.logger.Info("Send signal TERM or INT to terminate the process") | ||||
|  | ||||
| 	sigs := make(chan os.Signal, 1) | ||||
| 	signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) | ||||
| 	for { | ||||
| 		sig := <-sigs | ||||
| 		if sig == unix.SIGTSTP { | ||||
| 			bg.processor.stop() | ||||
| 			bg.ps.SetStatus(base.StatusStopped) | ||||
| 			srv.processor.stop() | ||||
| 			srv.ps.SetStatus(base.StatusStopped) | ||||
| 			continue | ||||
| 		} | ||||
| 		break | ||||
|   | ||||
| @@ -14,8 +14,8 @@ import ( | ||||
| // SIGTERM and SIGINT will signal the process to exit. | ||||
| // | ||||
| // Note: Currently SIGTSTP is not supported for windows build. | ||||
| func (bg *Background) waitForSignals() { | ||||
| 	bg.logger.Info("Send signal TERM or INT to terminate the process") | ||||
| func (srv *Server) waitForSignals() { | ||||
| 	srv.logger.Info("Send signal TERM or INT to terminate the process") | ||||
| 	sigs := make(chan os.Signal, 1) | ||||
| 	signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) | ||||
| 	<-sigs | ||||
|   | ||||
		Reference in New Issue
	
	Block a user