2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Export Start, Stop and Quiet method on Server type

This commit is contained in:
Ken Hibino 2020-04-12 11:00:45 -07:00
parent f9842ba914
commit 779065c269
4 changed files with 67 additions and 39 deletions

View File

@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) {
} }
b.StartTimer() // end setup b.StartTimer() // end setup
srv.start(HandlerFunc(handler)) srv.Start(HandlerFunc(handler))
wg.Wait() wg.Wait()
b.StopTimer() // begin teardown b.StopTimer() // begin teardown
srv.stop() srv.Stop()
b.StartTimer() // end teardown b.StartTimer() // end teardown
} }
} }
@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) {
} }
b.StartTimer() // end setup b.StartTimer() // end setup
srv.start(HandlerFunc(handler)) srv.Start(HandlerFunc(handler))
wg.Wait() wg.Wait()
b.StopTimer() // begin teardown b.StopTimer() // begin teardown
srv.stop() srv.Stop()
b.StartTimer() // end teardown b.StartTimer() // end teardown
} }
} }
@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
} }
b.StartTimer() // end setup b.StartTimer() // end setup
srv.start(HandlerFunc(handler)) srv.Start(HandlerFunc(handler))
wg.Wait() wg.Wait()
b.StopTimer() // begin teardown b.StopTimer() // begin teardown
srv.stop() srv.Stop()
b.StartTimer() // end teardown b.StartTimer() // end teardown
} }
} }

View File

@ -6,6 +6,7 @@ package asynq
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"math" "math"
"math/rand" "math/rand"
@ -31,7 +32,7 @@ import (
// queue for a certain amount of time). // queue for a certain amount of time).
type Server struct { type Server struct {
mu sync.Mutex mu sync.Mutex
running bool state serverState
ps *base.ProcessState ps *base.ProcessState
@ -197,6 +198,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler) processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(logger, rdb, cancels) subscriber := newSubscriber(logger, rdb, cancels)
return &Server{ return &Server{
state: stateIdle,
logger: logger, logger: logger,
rdb: rdb, rdb: rdb,
ps: ps, ps: ps,
@ -230,11 +232,44 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
return fn(ctx, task) return fn(ctx, task)
} }
// ErrServerStopped indicates that the operation is now illegal because of the server being stopped.
var ErrServerStopped = errors.New("asynq: the server has been stopped")
type serverState int
const (
stateIdle serverState = iota
stateRunning
stateStopped
)
// Run starts the background-task processing and blocks until // Run starts the background-task processing and blocks until
// an os signal to exit the program is received. Once it receives // an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other // a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks. // goroutines to process the tasks.
func (srv *Server) Run(handler Handler) { func (srv *Server) Run(handler Handler) error {
if err := srv.Start(handler); err != nil {
return err
}
srv.waitForSignals()
srv.Stop()
return nil
}
// Starts the background-task processing.
// TODO: doc
func (srv *Server) Start(handler Handler) error {
srv.mu.Lock()
defer srv.mu.Unlock()
switch srv.state {
case stateRunning:
return fmt.Errorf("asynq: the server is already running")
case stateStopped:
return ErrServerStopped
}
srv.state = stateRunning
srv.processor.handler = handler
type prefixLogger interface { type prefixLogger interface {
SetPrefix(prefix string) SetPrefix(prefix string)
} }
@ -244,40 +279,26 @@ func (srv *Server) Run(handler Handler) {
} }
srv.logger.Info("Starting processing") srv.logger.Info("Starting processing")
srv.start(handler)
defer srv.stop()
srv.waitForSignals()
fmt.Println()
srv.logger.Info("Starting graceful shutdown")
}
// starts the background-task processing.
func (srv *Server) start(handler Handler) {
srv.mu.Lock()
defer srv.mu.Unlock()
if srv.running {
return
}
srv.running = true
srv.processor.handler = handler
srv.heartbeater.start(&srv.wg) srv.heartbeater.start(&srv.wg)
srv.subscriber.start(&srv.wg) srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg) srv.syncer.start(&srv.wg)
srv.scheduler.start(&srv.wg) srv.scheduler.start(&srv.wg)
srv.processor.start(&srv.wg) srv.processor.start(&srv.wg)
return nil
} }
// stops the background-task processing. // Stops the background-task processing.
func (srv *Server) stop() { // TODO: do we need to return error?
func (srv *Server) Stop() {
srv.mu.Lock() srv.mu.Lock()
defer srv.mu.Unlock() defer srv.mu.Unlock()
if !srv.running { if srv.state != stateRunning {
// server is not running, do nothing and return.
return return
} }
fmt.Println() // print newline for prettier log.
srv.logger.Info("Starting graceful shutdown")
// Note: The order of termination is important. // Note: The order of termination is important.
// Sender goroutines should be terminated before the receiver goroutines. // Sender goroutines should be terminated before the receiver goroutines.
// //
@ -291,7 +312,14 @@ func (srv *Server) stop() {
srv.wg.Wait() srv.wg.Wait()
srv.rdb.Close() srv.rdb.Close()
srv.running = false srv.state = stateStopped
srv.logger.Info("Bye!") srv.logger.Info("Bye!")
} }
// Quiet signals server to stop processing new tasks.
// TODO: doc
func (srv *Server) Quiet() {
srv.processor.stop()
srv.ps.SetStatus(base.StatusStopped) // TODO: rephrase this state, like StatusSilent?
}

View File

@ -32,9 +32,12 @@ func TestServer(t *testing.T) {
return nil return nil
} }
srv.start(HandlerFunc(h)) err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
@ -44,7 +47,7 @@ func TestServer(t *testing.T) {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
srv.stop() srv.Stop()
} }
func TestGCD(t *testing.T) { func TestGCD(t *testing.T) {

View File

@ -7,8 +7,6 @@ import (
"os/signal" "os/signal"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"github.com/hibiken/asynq/internal/base"
) )
// waitForSignals waits for signals and handles them. // waitForSignals waits for signals and handles them.
@ -24,8 +22,7 @@ func (srv *Server) waitForSignals() {
for { for {
sig := <-sigs sig := <-sigs
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
srv.processor.stop() srv.Quiet()
srv.ps.SetStatus(base.StatusStopped)
continue continue
} }
break break