2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Fix processor

This commit is contained in:
Ken Hibino
2020-06-19 06:21:25 -07:00
parent 83f1e20d74
commit 7c7de0d8e0
3 changed files with 10 additions and 31 deletions

View File

@@ -250,17 +250,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
return fn, ok
}
// GetAll returns all cancel funcs.
func (c *Cancelations) GetAll() []context.CancelFunc {
c.mu.Lock()
defer c.mu.Unlock()
var res []context.CancelFunc
for _, fn := range c.cancelFuncs {
res = append(res, fn)
}
return res
}
// Broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.

View File

@@ -222,9 +222,4 @@ func TestCancelationsConcurrentAccess(t *testing.T) {
if ok {
t.Errorf("(*Cancelations).Get(%q) = _, true, want <nil>, false", key2)
}
funcs := c.GetAll()
if len(funcs) != 2 {
t.Errorf("(*Cancelations).GetAll() returns %d functions, want 2", len(funcs))
}
}

View File

@@ -50,12 +50,12 @@ type processor struct {
done chan struct{}
once sync.Once
// abort channel is closed when the shutdown of the "processor" goroutine starts.
abort chan struct{}
// quit channel communicates to the in-flight worker goroutines to stop.
// quit channel is closed when the shutdown of the "processor" goroutine starts.
quit chan struct{}
// abort channel communicates to the in-flight worker goroutines to stop.
abort chan struct{}
// cancelations is a set of cancel functions for all in-progress tasks.
cancelations *base.Cancelations
@@ -98,8 +98,8 @@ func newProcessor(params processorParams) *processor {
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
starting: params.starting,
@@ -113,7 +113,7 @@ func (p *processor) stop() {
p.once.Do(func() {
p.logger.Debug("Processor shutting down...")
// Unblock if processor is waiting for sema token.
close(p.abort)
close(p.quit)
// Signal the processor goroutine to stop processing tasks
// from the queue.
p.done <- struct{}{}
@@ -124,14 +124,9 @@ func (p *processor) stop() {
func (p *processor) terminate() {
p.stop()
time.AfterFunc(p.shutdownTimeout, func() { close(p.quit) })
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
p.logger.Info("Waiting for all workers to finish...")
// send cancellation signal to all in-progress task handlers
for _, cancel := range p.cancelations.GetAll() {
cancel()
}
// block until all workers have released the token
for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{}
@@ -159,7 +154,7 @@ func (p *processor) start(wg *sync.WaitGroup) {
// process the task.
func (p *processor) exec() {
select {
case <-p.abort:
case <-p.quit:
return
case p.sema <- struct{}{}: // acquire token
qnames := p.queues()
@@ -201,7 +196,7 @@ func (p *processor) exec() {
go func() { resCh <- perform(ctx, task, p.handler) }()
select {
case <-p.quit:
case <-p.abort:
// time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(msg)