mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 23:02:18 +08:00
Rename Background to Server
This commit is contained in:
parent
022dc29701
commit
f9842ba914
@ -24,7 +24,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
|
|||||||
DB: redisDB,
|
DB: redisDB,
|
||||||
}
|
}
|
||||||
client := NewClient(redis)
|
client := NewClient(redis)
|
||||||
bg := NewBackground(redis, &Config{
|
srv := NewServer(redis, Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
||||||
return time.Second
|
return time.Second
|
||||||
@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
bg.start(HandlerFunc(handler))
|
srv.start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
bg.stop()
|
srv.stop()
|
||||||
b.StartTimer() // end teardown
|
b.StartTimer() // end teardown
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,7 +67,7 @@ func BenchmarkEndToEnd(b *testing.B) {
|
|||||||
DB: redisDB,
|
DB: redisDB,
|
||||||
}
|
}
|
||||||
client := NewClient(redis)
|
client := NewClient(redis)
|
||||||
bg := NewBackground(redis, &Config{
|
srv := NewServer(redis, Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
||||||
return time.Second
|
return time.Second
|
||||||
@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
bg.start(HandlerFunc(handler))
|
srv.start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
bg.stop()
|
srv.stop()
|
||||||
b.StartTimer() // end teardown
|
b.StartTimer() // end teardown
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -124,7 +124,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
|
|||||||
DB: redisDB,
|
DB: redisDB,
|
||||||
}
|
}
|
||||||
client := NewClient(redis)
|
client := NewClient(redis)
|
||||||
bg := NewBackground(redis, &Config{
|
srv := NewServer(redis, Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
Queues: map[string]int{
|
Queues: map[string]int{
|
||||||
"high": 6,
|
"high": 6,
|
||||||
@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
bg.start(HandlerFunc(handler))
|
srv.start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
bg.stop()
|
srv.stop()
|
||||||
b.StartTimer() // end teardown
|
b.StartTimer() // end teardown
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -18,10 +18,10 @@ import (
|
|||||||
"github.com/hibiken/asynq/internal/rdb"
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Background is responsible for managing the background-task processing.
|
// Server is responsible for managing the background-task processing.
|
||||||
//
|
//
|
||||||
// Background manages task queues to process tasks.
|
// Server pulls tasks off queues and process them.
|
||||||
// If the processing of a task is unsuccessful, background will
|
// If the processing of a task is unsuccessful, server will
|
||||||
// schedule it for a retry until either the task gets processed successfully
|
// schedule it for a retry until either the task gets processed successfully
|
||||||
// or it exhausts its max retry count.
|
// or it exhausts its max retry count.
|
||||||
//
|
//
|
||||||
@ -29,7 +29,7 @@ import (
|
|||||||
// will be kept in the queue for some time until a certain condition is met
|
// will be kept in the queue for some time until a certain condition is met
|
||||||
// (e.g., queue size reaches a certain limit, or the task has been in the
|
// (e.g., queue size reaches a certain limit, or the task has been in the
|
||||||
// queue for a certain amount of time).
|
// queue for a certain amount of time).
|
||||||
type Background struct {
|
type Server struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
running bool
|
running bool
|
||||||
|
|
||||||
@ -48,11 +48,11 @@ type Background struct {
|
|||||||
subscriber *subscriber
|
subscriber *subscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config specifies the background-task processing behavior.
|
// Config specifies the server's background-task processing behavior.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// Maximum number of concurrent processing of tasks.
|
// Maximum number of concurrent processing of tasks.
|
||||||
//
|
//
|
||||||
// If set to a zero or negative value, NewBackground will overwrite the value to one.
|
// If set to a zero or negative value, NewServer will overwrite the value to one.
|
||||||
Concurrency int
|
Concurrency int
|
||||||
|
|
||||||
// Function to calculate retry delay for a failed task.
|
// Function to calculate retry delay for a failed task.
|
||||||
@ -67,7 +67,7 @@ type Config struct {
|
|||||||
// List of queues to process with given priority value. Keys are the names of the
|
// List of queues to process with given priority value. Keys are the names of the
|
||||||
// queues and values are associated priority value.
|
// queues and values are associated priority value.
|
||||||
//
|
//
|
||||||
// If set to nil or not specified, the background will process only the "default" queue.
|
// If set to nil or not specified, the server will process only the "default" queue.
|
||||||
//
|
//
|
||||||
// Priority is treated as follows to avoid starving low priority queues.
|
// Priority is treated as follows to avoid starving low priority queues.
|
||||||
//
|
//
|
||||||
@ -106,7 +106,7 @@ type Config struct {
|
|||||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||||
ErrorHandler ErrorHandler
|
ErrorHandler ErrorHandler
|
||||||
|
|
||||||
// Logger specifies the logger used by the background instance.
|
// Logger specifies the logger used by the server instance.
|
||||||
//
|
//
|
||||||
// If unset, default logger is used.
|
// If unset, default logger is used.
|
||||||
Logger Logger
|
Logger Logger
|
||||||
@ -156,9 +156,9 @@ var defaultQueueConfig = map[string]int{
|
|||||||
base.DefaultQueueName: 1,
|
base.DefaultQueueName: 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBackground returns a new Background given a redis connection option
|
// NewServer returns a new Server given a redis connection option
|
||||||
// and background processing configuration.
|
// and background processing configuration.
|
||||||
func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||||
n := cfg.Concurrency
|
n := cfg.Concurrency
|
||||||
if n < 1 {
|
if n < 1 {
|
||||||
n = 1
|
n = 1
|
||||||
@ -196,7 +196,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
|||||||
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
|
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
|
||||||
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
|
||||||
subscriber := newSubscriber(logger, rdb, cancels)
|
subscriber := newSubscriber(logger, rdb, cancels)
|
||||||
return &Background{
|
return &Server{
|
||||||
logger: logger,
|
logger: logger,
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
ps: ps,
|
ps: ps,
|
||||||
@ -234,47 +234,47 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
|
|||||||
// an os signal to exit the program is received. Once it receives
|
// an os signal to exit the program is received. Once it receives
|
||||||
// a signal, it gracefully shuts down all pending workers and other
|
// a signal, it gracefully shuts down all pending workers and other
|
||||||
// goroutines to process the tasks.
|
// goroutines to process the tasks.
|
||||||
func (bg *Background) Run(handler Handler) {
|
func (srv *Server) Run(handler Handler) {
|
||||||
type prefixLogger interface {
|
type prefixLogger interface {
|
||||||
SetPrefix(prefix string)
|
SetPrefix(prefix string)
|
||||||
}
|
}
|
||||||
// If logger supports setting prefix, then set prefix for log output.
|
// If logger supports setting prefix, then set prefix for log output.
|
||||||
if l, ok := bg.logger.(prefixLogger); ok {
|
if l, ok := srv.logger.(prefixLogger); ok {
|
||||||
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
|
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
|
||||||
}
|
}
|
||||||
bg.logger.Info("Starting processing")
|
srv.logger.Info("Starting processing")
|
||||||
|
|
||||||
bg.start(handler)
|
srv.start(handler)
|
||||||
defer bg.stop()
|
defer srv.stop()
|
||||||
|
|
||||||
bg.waitForSignals()
|
srv.waitForSignals()
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
bg.logger.Info("Starting graceful shutdown")
|
srv.logger.Info("Starting graceful shutdown")
|
||||||
}
|
}
|
||||||
|
|
||||||
// starts the background-task processing.
|
// starts the background-task processing.
|
||||||
func (bg *Background) start(handler Handler) {
|
func (srv *Server) start(handler Handler) {
|
||||||
bg.mu.Lock()
|
srv.mu.Lock()
|
||||||
defer bg.mu.Unlock()
|
defer srv.mu.Unlock()
|
||||||
if bg.running {
|
if srv.running {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bg.running = true
|
srv.running = true
|
||||||
bg.processor.handler = handler
|
srv.processor.handler = handler
|
||||||
|
|
||||||
bg.heartbeater.start(&bg.wg)
|
srv.heartbeater.start(&srv.wg)
|
||||||
bg.subscriber.start(&bg.wg)
|
srv.subscriber.start(&srv.wg)
|
||||||
bg.syncer.start(&bg.wg)
|
srv.syncer.start(&srv.wg)
|
||||||
bg.scheduler.start(&bg.wg)
|
srv.scheduler.start(&srv.wg)
|
||||||
bg.processor.start(&bg.wg)
|
srv.processor.start(&srv.wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// stops the background-task processing.
|
// stops the background-task processing.
|
||||||
func (bg *Background) stop() {
|
func (srv *Server) stop() {
|
||||||
bg.mu.Lock()
|
srv.mu.Lock()
|
||||||
defer bg.mu.Unlock()
|
defer srv.mu.Unlock()
|
||||||
if !bg.running {
|
if !srv.running {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,16 +282,16 @@ func (bg *Background) stop() {
|
|||||||
// Sender goroutines should be terminated before the receiver goroutines.
|
// Sender goroutines should be terminated before the receiver goroutines.
|
||||||
//
|
//
|
||||||
// processor -> syncer (via syncCh)
|
// processor -> syncer (via syncCh)
|
||||||
bg.scheduler.terminate()
|
srv.scheduler.terminate()
|
||||||
bg.processor.terminate()
|
srv.processor.terminate()
|
||||||
bg.syncer.terminate()
|
srv.syncer.terminate()
|
||||||
bg.subscriber.terminate()
|
srv.subscriber.terminate()
|
||||||
bg.heartbeater.terminate()
|
srv.heartbeater.terminate()
|
||||||
|
|
||||||
bg.wg.Wait()
|
srv.wg.Wait()
|
||||||
|
|
||||||
bg.rdb.Close()
|
srv.rdb.Close()
|
||||||
bg.running = false
|
srv.running = false
|
||||||
|
|
||||||
bg.logger.Info("Bye!")
|
srv.logger.Info("Bye!")
|
||||||
}
|
}
|
@ -13,7 +13,7 @@ import (
|
|||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestBackground(t *testing.T) {
|
func TestServer(t *testing.T) {
|
||||||
// https://github.com/go-redis/redis/issues/1029
|
// https://github.com/go-redis/redis/issues/1029
|
||||||
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
|
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
|
||||||
defer goleak.VerifyNoLeaks(t, ignoreOpt)
|
defer goleak.VerifyNoLeaks(t, ignoreOpt)
|
||||||
@ -22,8 +22,8 @@ func TestBackground(t *testing.T) {
|
|||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
DB: 15,
|
DB: 15,
|
||||||
}
|
}
|
||||||
client := NewClient(r)
|
c := NewClient(r)
|
||||||
bg := NewBackground(r, &Config{
|
srv := NewServer(r, Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -32,19 +32,19 @@ func TestBackground(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
bg.start(HandlerFunc(h))
|
srv.start(HandlerFunc(h))
|
||||||
|
|
||||||
err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
|
err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("could not enqueue a task: %v", err)
|
t.Errorf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
|
err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("could not enqueue a task: %v", err)
|
t.Errorf("could not enqueue a task: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
bg.stop()
|
srv.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGCD(t *testing.T) {
|
func TestGCD(t *testing.T) {
|
@ -15,17 +15,17 @@ import (
|
|||||||
// It handles SIGTERM, SIGINT, and SIGTSTP.
|
// It handles SIGTERM, SIGINT, and SIGTSTP.
|
||||||
// SIGTERM and SIGINT will signal the process to exit.
|
// SIGTERM and SIGINT will signal the process to exit.
|
||||||
// SIGTSTP will signal the process to stop processing new tasks.
|
// SIGTSTP will signal the process to stop processing new tasks.
|
||||||
func (bg *Background) waitForSignals() {
|
func (srv *Server) waitForSignals() {
|
||||||
bg.logger.Info("Send signal TSTP to stop processing new tasks")
|
srv.logger.Info("Send signal TSTP to stop processing new tasks")
|
||||||
bg.logger.Info("Send signal TERM or INT to terminate the process")
|
srv.logger.Info("Send signal TERM or INT to terminate the process")
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
|
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
|
||||||
for {
|
for {
|
||||||
sig := <-sigs
|
sig := <-sigs
|
||||||
if sig == unix.SIGTSTP {
|
if sig == unix.SIGTSTP {
|
||||||
bg.processor.stop()
|
srv.processor.stop()
|
||||||
bg.ps.SetStatus(base.StatusStopped)
|
srv.ps.SetStatus(base.StatusStopped)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
|
@ -14,8 +14,8 @@ import (
|
|||||||
// SIGTERM and SIGINT will signal the process to exit.
|
// SIGTERM and SIGINT will signal the process to exit.
|
||||||
//
|
//
|
||||||
// Note: Currently SIGTSTP is not supported for windows build.
|
// Note: Currently SIGTSTP is not supported for windows build.
|
||||||
func (bg *Background) waitForSignals() {
|
func (srv *Server) waitForSignals() {
|
||||||
bg.logger.Info("Send signal TERM or INT to terminate the process")
|
srv.logger.Info("Send signal TERM or INT to terminate the process")
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
|
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
|
||||||
<-sigs
|
<-sigs
|
||||||
|
Loading…
Reference in New Issue
Block a user