From 7c7de0d8e0bf1ba88c34b2081d41291304c0363b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 19 Jun 2020 06:21:25 -0700 Subject: [PATCH] Fix processor --- internal/base/base.go | 11 ----------- internal/base/base_test.go | 5 ----- processor.go | 25 ++++++++++--------------- 3 files changed, 10 insertions(+), 31 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index 4b2f54e..08b96b1 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -250,17 +250,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) { return fn, ok } -// GetAll returns all cancel funcs. -func (c *Cancelations) GetAll() []context.CancelFunc { - c.mu.Lock() - defer c.mu.Unlock() - var res []context.CancelFunc - for _, fn := range c.cancelFuncs { - res = append(res, fn) - } - return res -} - // Broker is a message broker that supports operations to manage task queues. // // See rdb.RDB as a reference implementation. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 5c7d2da..f63688a 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -222,9 +222,4 @@ func TestCancelationsConcurrentAccess(t *testing.T) { if ok { t.Errorf("(*Cancelations).Get(%q) = _, true, want , false", key2) } - - funcs := c.GetAll() - if len(funcs) != 2 { - t.Errorf("(*Cancelations).GetAll() returns %d functions, want 2", len(funcs)) - } } diff --git a/processor.go b/processor.go index c3472b7..459b64d 100644 --- a/processor.go +++ b/processor.go @@ -50,12 +50,12 @@ type processor struct { done chan struct{} once sync.Once - // abort channel is closed when the shutdown of the "processor" goroutine starts. - abort chan struct{} - - // quit channel communicates to the in-flight worker goroutines to stop. + // quit channel is closed when the shutdown of the "processor" goroutine starts. quit chan struct{} + // abort channel communicates to the in-flight worker goroutines to stop. + abort chan struct{} + // cancelations is a set of cancel functions for all in-progress tasks. cancelations *base.Cancelations @@ -98,8 +98,8 @@ func newProcessor(params processorParams) *processor { errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), sema: make(chan struct{}, params.concurrency), done: make(chan struct{}), - abort: make(chan struct{}), quit: make(chan struct{}), + abort: make(chan struct{}), errHandler: params.errHandler, handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), starting: params.starting, @@ -113,7 +113,7 @@ func (p *processor) stop() { p.once.Do(func() { p.logger.Debug("Processor shutting down...") // Unblock if processor is waiting for sema token. - close(p.abort) + close(p.quit) // Signal the processor goroutine to stop processing tasks // from the queue. p.done <- struct{}{} @@ -124,14 +124,9 @@ func (p *processor) stop() { func (p *processor) terminate() { p.stop() - time.AfterFunc(p.shutdownTimeout, func() { close(p.quit) }) + time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) }) + p.logger.Info("Waiting for all workers to finish...") - - // send cancellation signal to all in-progress task handlers - for _, cancel := range p.cancelations.GetAll() { - cancel() - } - // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { p.sema <- struct{}{} @@ -159,7 +154,7 @@ func (p *processor) start(wg *sync.WaitGroup) { // process the task. func (p *processor) exec() { select { - case <-p.abort: + case <-p.quit: return case p.sema <- struct{}{}: // acquire token qnames := p.queues() @@ -201,7 +196,7 @@ func (p *processor) exec() { go func() { resCh <- perform(ctx, task, p.handler) }() select { - case <-p.quit: + case <-p.abort: // time is up, push the message back to queue and quit this worker goroutine. p.logger.Warnf("Quitting worker. task id=%s", msg.ID) p.requeue(msg)