Refactor server state management

This commit is contained in:
Ken Hibino
2020-05-18 20:47:35 -07:00
parent 69ad583278
commit a38f628f3b
12 changed files with 482 additions and 468 deletions

View File

@@ -22,8 +22,6 @@ type processor struct {
logger *log.Logger
broker base.Broker
ss *base.ServerState
handler Handler
queueConfig map[string]int
@@ -60,6 +58,9 @@ type processor struct {
// cancelations is a set of cancel functions for all in-progress tasks.
cancelations *base.Cancelations
starting chan<- *base.TaskMessage
finished chan<- *base.TaskMessage
}
type retryDelayFunc func(n int, err error, task *Task) time.Duration
@@ -67,38 +68,42 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration
type processorParams struct {
logger *log.Logger
broker base.Broker
ss *base.ServerState
retryDelayFunc retryDelayFunc
syncCh chan<- *syncRequest
cancelations *base.Cancelations
concurrency int
queues map[string]int
strictPriority bool
errHandler ErrorHandler
shutdownTimeout time.Duration
starting chan<- *base.TaskMessage
finished chan<- *base.TaskMessage
}
// newProcessor constructs a new processor.
func newProcessor(params processorParams) *processor {
info := params.ss.GetInfo()
qcfg := normalizeQueueCfg(info.Queues)
queues := normalizeQueues(params.queues)
orderedQueues := []string(nil)
if info.StrictPriority {
orderedQueues = sortByPriority(qcfg)
if params.strictPriority {
orderedQueues = sortByPriority(queues)
}
return &processor{
logger: params.logger,
broker: params.broker,
ss: params.ss,
queueConfig: qcfg,
queueConfig: queues,
orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc,
syncRequestCh: params.syncCh,
cancelations: params.cancelations,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, info.Concurrency),
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
starting: params.starting,
finished: params.finished,
}
}
@@ -183,10 +188,10 @@ func (p *processor) exec() {
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
p.ss.AddWorkerStats(msg, time.Now())
p.starting <- msg
go func() {
defer func() {
p.ss.DeleteWorkerStats(msg)
p.finished <- msg
<-p.sema // release token
}()
@@ -374,16 +379,15 @@ func (x byPriority) Len() int { return len(x) }
func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority }
func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] }
// normalizeQueueCfg divides priority numbers by their
// greatest common divisor.
func normalizeQueueCfg(queueCfg map[string]int) map[string]int {
// normalizeQueues divides priority numbers by their greatest common divisor.
func normalizeQueues(queues map[string]int) map[string]int {
var xs []int
for _, x := range queueCfg {
for _, x := range queues {
xs = append(xs, x)
}
d := gcd(xs...)
res := make(map[string]int)
for q, x := range queueCfg {
for q, x := range queues {
res[q] = x / d
}
return res