From f9842ba914e61e95021aa0e8e1a4ccb4df37d930 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 12 Apr 2020 08:16:42 -0700 Subject: [PATCH] Rename Background to Server --- benchmark_test.go | 18 +++--- background.go => server.go | 84 ++++++++++++++-------------- background_test.go => server_test.go | 14 ++--- signals_unix.go | 10 ++-- signals_windows.go | 4 +- 5 files changed, 65 insertions(+), 65 deletions(-) rename background.go => server.go (82%) rename background_test.go => server_test.go (86%) diff --git a/benchmark_test.go b/benchmark_test.go index 469c048..b6da0b2 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -24,7 +24,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { DB: redisDB, } client := NewClient(redis) - bg := NewBackground(redis, &Config{ + srv := NewServer(redis, Config{ Concurrency: 10, RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second @@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) { } b.StartTimer() // end setup - bg.start(HandlerFunc(handler)) + srv.start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - bg.stop() + srv.stop() b.StartTimer() // end teardown } } @@ -67,7 +67,7 @@ func BenchmarkEndToEnd(b *testing.B) { DB: redisDB, } client := NewClient(redis) - bg := NewBackground(redis, &Config{ + srv := NewServer(redis, Config{ Concurrency: 10, RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second @@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) { } b.StartTimer() // end setup - bg.start(HandlerFunc(handler)) + srv.start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - bg.stop() + srv.stop() b.StartTimer() // end teardown } } @@ -124,7 +124,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { DB: redisDB, } client := NewClient(redis) - bg := NewBackground(redis, &Config{ + srv := NewServer(redis, Config{ Concurrency: 10, Queues: map[string]int{ "high": 6, @@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { } b.StartTimer() // end setup - bg.start(HandlerFunc(handler)) + srv.start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown - bg.stop() + srv.stop() b.StartTimer() // end teardown } } diff --git a/background.go b/server.go similarity index 82% rename from background.go rename to server.go index 031d3d1..50cbc8f 100644 --- a/background.go +++ b/server.go @@ -18,10 +18,10 @@ import ( "github.com/hibiken/asynq/internal/rdb" ) -// Background is responsible for managing the background-task processing. +// Server is responsible for managing the background-task processing. // -// Background manages task queues to process tasks. -// If the processing of a task is unsuccessful, background will +// Server pulls tasks off queues and process them. +// If the processing of a task is unsuccessful, server will // schedule it for a retry until either the task gets processed successfully // or it exhausts its max retry count. // @@ -29,7 +29,7 @@ import ( // will be kept in the queue for some time until a certain condition is met // (e.g., queue size reaches a certain limit, or the task has been in the // queue for a certain amount of time). -type Background struct { +type Server struct { mu sync.Mutex running bool @@ -48,11 +48,11 @@ type Background struct { subscriber *subscriber } -// Config specifies the background-task processing behavior. +// Config specifies the server's background-task processing behavior. type Config struct { // Maximum number of concurrent processing of tasks. // - // If set to a zero or negative value, NewBackground will overwrite the value to one. + // If set to a zero or negative value, NewServer will overwrite the value to one. Concurrency int // Function to calculate retry delay for a failed task. @@ -67,7 +67,7 @@ type Config struct { // List of queues to process with given priority value. Keys are the names of the // queues and values are associated priority value. // - // If set to nil or not specified, the background will process only the "default" queue. + // If set to nil or not specified, the server will process only the "default" queue. // // Priority is treated as follows to avoid starving low priority queues. // @@ -106,7 +106,7 @@ type Config struct { // ErrorHandler: asynq.ErrorHandlerFunc(reportError) ErrorHandler ErrorHandler - // Logger specifies the logger used by the background instance. + // Logger specifies the logger used by the server instance. // // If unset, default logger is used. Logger Logger @@ -156,9 +156,9 @@ var defaultQueueConfig = map[string]int{ base.DefaultQueueName: 1, } -// NewBackground returns a new Background given a redis connection option +// NewServer returns a new Server given a redis connection option // and background processing configuration. -func NewBackground(r RedisConnOpt, cfg *Config) *Background { +func NewServer(r RedisConnOpt, cfg Config) *Server { n := cfg.Concurrency if n < 1 { n = 1 @@ -196,7 +196,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { scheduler := newScheduler(logger, rdb, 5*time.Second, queues) processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) subscriber := newSubscriber(logger, rdb, cancels) - return &Background{ + return &Server{ logger: logger, rdb: rdb, ps: ps, @@ -234,47 +234,47 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { // an os signal to exit the program is received. Once it receives // a signal, it gracefully shuts down all pending workers and other // goroutines to process the tasks. -func (bg *Background) Run(handler Handler) { +func (srv *Server) Run(handler Handler) { type prefixLogger interface { SetPrefix(prefix string) } // If logger supports setting prefix, then set prefix for log output. - if l, ok := bg.logger.(prefixLogger); ok { + if l, ok := srv.logger.(prefixLogger); ok { l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid())) } - bg.logger.Info("Starting processing") + srv.logger.Info("Starting processing") - bg.start(handler) - defer bg.stop() + srv.start(handler) + defer srv.stop() - bg.waitForSignals() + srv.waitForSignals() fmt.Println() - bg.logger.Info("Starting graceful shutdown") + srv.logger.Info("Starting graceful shutdown") } // starts the background-task processing. -func (bg *Background) start(handler Handler) { - bg.mu.Lock() - defer bg.mu.Unlock() - if bg.running { +func (srv *Server) start(handler Handler) { + srv.mu.Lock() + defer srv.mu.Unlock() + if srv.running { return } - bg.running = true - bg.processor.handler = handler + srv.running = true + srv.processor.handler = handler - bg.heartbeater.start(&bg.wg) - bg.subscriber.start(&bg.wg) - bg.syncer.start(&bg.wg) - bg.scheduler.start(&bg.wg) - bg.processor.start(&bg.wg) + srv.heartbeater.start(&srv.wg) + srv.subscriber.start(&srv.wg) + srv.syncer.start(&srv.wg) + srv.scheduler.start(&srv.wg) + srv.processor.start(&srv.wg) } // stops the background-task processing. -func (bg *Background) stop() { - bg.mu.Lock() - defer bg.mu.Unlock() - if !bg.running { +func (srv *Server) stop() { + srv.mu.Lock() + defer srv.mu.Unlock() + if !srv.running { return } @@ -282,16 +282,16 @@ func (bg *Background) stop() { // Sender goroutines should be terminated before the receiver goroutines. // // processor -> syncer (via syncCh) - bg.scheduler.terminate() - bg.processor.terminate() - bg.syncer.terminate() - bg.subscriber.terminate() - bg.heartbeater.terminate() + srv.scheduler.terminate() + srv.processor.terminate() + srv.syncer.terminate() + srv.subscriber.terminate() + srv.heartbeater.terminate() - bg.wg.Wait() + srv.wg.Wait() - bg.rdb.Close() - bg.running = false + srv.rdb.Close() + srv.running = false - bg.logger.Info("Bye!") + srv.logger.Info("Bye!") } diff --git a/background_test.go b/server_test.go similarity index 86% rename from background_test.go rename to server_test.go index dc2ec03..ce3ff0c 100644 --- a/background_test.go +++ b/server_test.go @@ -13,7 +13,7 @@ import ( "go.uber.org/goleak" ) -func TestBackground(t *testing.T) { +func TestServer(t *testing.T) { // https://github.com/go-redis/redis/issues/1029 ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper") defer goleak.VerifyNoLeaks(t, ignoreOpt) @@ -22,8 +22,8 @@ func TestBackground(t *testing.T) { Addr: "localhost:6379", DB: 15, } - client := NewClient(r) - bg := NewBackground(r, &Config{ + c := NewClient(r) + srv := NewServer(r, Config{ Concurrency: 10, }) @@ -32,19 +32,19 @@ func TestBackground(t *testing.T) { return nil } - bg.start(HandlerFunc(h)) + srv.start(HandlerFunc(h)) - err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) + err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456})) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - bg.stop() + srv.stop() } func TestGCD(t *testing.T) { diff --git a/signals_unix.go b/signals_unix.go index addd595..66cfaba 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -15,17 +15,17 @@ import ( // It handles SIGTERM, SIGINT, and SIGTSTP. // SIGTERM and SIGINT will signal the process to exit. // SIGTSTP will signal the process to stop processing new tasks. -func (bg *Background) waitForSignals() { - bg.logger.Info("Send signal TSTP to stop processing new tasks") - bg.logger.Info("Send signal TERM or INT to terminate the process") +func (srv *Server) waitForSignals() { + srv.logger.Info("Send signal TSTP to stop processing new tasks") + srv.logger.Info("Send signal TERM or INT to terminate the process") sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP) for { sig := <-sigs if sig == unix.SIGTSTP { - bg.processor.stop() - bg.ps.SetStatus(base.StatusStopped) + srv.processor.stop() + srv.ps.SetStatus(base.StatusStopped) continue } break diff --git a/signals_windows.go b/signals_windows.go index 601ca2e..b06e7ec 100644 --- a/signals_windows.go +++ b/signals_windows.go @@ -14,8 +14,8 @@ import ( // SIGTERM and SIGINT will signal the process to exit. // // Note: Currently SIGTSTP is not supported for windows build. -func (bg *Background) waitForSignals() { - bg.logger.Info("Send signal TERM or INT to terminate the process") +func (srv *Server) waitForSignals() { + srv.logger.Info("Send signal TERM or INT to terminate the process") sigs := make(chan os.Signal, 1) signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) <-sigs