mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-15 20:08:46 +08:00
Fix processor
This commit is contained in:
parent
ef4a4a8334
commit
232efe8279
@ -250,17 +250,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
|
|||||||
return fn, ok
|
return fn, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetAll returns all cancel funcs.
|
|
||||||
func (c *Cancelations) GetAll() []context.CancelFunc {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
var res []context.CancelFunc
|
|
||||||
for _, fn := range c.cancelFuncs {
|
|
||||||
res = append(res, fn)
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
|
|
||||||
// Broker is a message broker that supports operations to manage task queues.
|
// Broker is a message broker that supports operations to manage task queues.
|
||||||
//
|
//
|
||||||
// See rdb.RDB as a reference implementation.
|
// See rdb.RDB as a reference implementation.
|
||||||
|
@ -222,9 +222,4 @@ func TestCancelationsConcurrentAccess(t *testing.T) {
|
|||||||
if ok {
|
if ok {
|
||||||
t.Errorf("(*Cancelations).Get(%q) = _, true, want <nil>, false", key2)
|
t.Errorf("(*Cancelations).Get(%q) = _, true, want <nil>, false", key2)
|
||||||
}
|
}
|
||||||
|
|
||||||
funcs := c.GetAll()
|
|
||||||
if len(funcs) != 2 {
|
|
||||||
t.Errorf("(*Cancelations).GetAll() returns %d functions, want 2", len(funcs))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
25
processor.go
25
processor.go
@ -50,12 +50,12 @@ type processor struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
once sync.Once
|
once sync.Once
|
||||||
|
|
||||||
// abort channel is closed when the shutdown of the "processor" goroutine starts.
|
// quit channel is closed when the shutdown of the "processor" goroutine starts.
|
||||||
abort chan struct{}
|
|
||||||
|
|
||||||
// quit channel communicates to the in-flight worker goroutines to stop.
|
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
|
||||||
|
// abort channel communicates to the in-flight worker goroutines to stop.
|
||||||
|
abort chan struct{}
|
||||||
|
|
||||||
// cancelations is a set of cancel functions for all in-progress tasks.
|
// cancelations is a set of cancel functions for all in-progress tasks.
|
||||||
cancelations *base.Cancelations
|
cancelations *base.Cancelations
|
||||||
|
|
||||||
@ -98,8 +98,8 @@ func newProcessor(params processorParams) *processor {
|
|||||||
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
||||||
sema: make(chan struct{}, params.concurrency),
|
sema: make(chan struct{}, params.concurrency),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
abort: make(chan struct{}),
|
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
abort: make(chan struct{}),
|
||||||
errHandler: params.errHandler,
|
errHandler: params.errHandler,
|
||||||
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
||||||
starting: params.starting,
|
starting: params.starting,
|
||||||
@ -113,7 +113,7 @@ func (p *processor) stop() {
|
|||||||
p.once.Do(func() {
|
p.once.Do(func() {
|
||||||
p.logger.Debug("Processor shutting down...")
|
p.logger.Debug("Processor shutting down...")
|
||||||
// Unblock if processor is waiting for sema token.
|
// Unblock if processor is waiting for sema token.
|
||||||
close(p.abort)
|
close(p.quit)
|
||||||
// Signal the processor goroutine to stop processing tasks
|
// Signal the processor goroutine to stop processing tasks
|
||||||
// from the queue.
|
// from the queue.
|
||||||
p.done <- struct{}{}
|
p.done <- struct{}{}
|
||||||
@ -124,14 +124,9 @@ func (p *processor) stop() {
|
|||||||
func (p *processor) terminate() {
|
func (p *processor) terminate() {
|
||||||
p.stop()
|
p.stop()
|
||||||
|
|
||||||
time.AfterFunc(p.shutdownTimeout, func() { close(p.quit) })
|
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
|
||||||
|
|
||||||
p.logger.Info("Waiting for all workers to finish...")
|
p.logger.Info("Waiting for all workers to finish...")
|
||||||
|
|
||||||
// send cancellation signal to all in-progress task handlers
|
|
||||||
for _, cancel := range p.cancelations.GetAll() {
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
// block until all workers have released the token
|
// block until all workers have released the token
|
||||||
for i := 0; i < cap(p.sema); i++ {
|
for i := 0; i < cap(p.sema); i++ {
|
||||||
p.sema <- struct{}{}
|
p.sema <- struct{}{}
|
||||||
@ -159,7 +154,7 @@ func (p *processor) start(wg *sync.WaitGroup) {
|
|||||||
// process the task.
|
// process the task.
|
||||||
func (p *processor) exec() {
|
func (p *processor) exec() {
|
||||||
select {
|
select {
|
||||||
case <-p.abort:
|
case <-p.quit:
|
||||||
return
|
return
|
||||||
case p.sema <- struct{}{}: // acquire token
|
case p.sema <- struct{}{}: // acquire token
|
||||||
qnames := p.queues()
|
qnames := p.queues()
|
||||||
@ -201,7 +196,7 @@ func (p *processor) exec() {
|
|||||||
go func() { resCh <- perform(ctx, task, p.handler) }()
|
go func() { resCh <- perform(ctx, task, p.handler) }()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-p.quit:
|
case <-p.abort:
|
||||||
// time is up, push the message back to queue and quit this worker goroutine.
|
// time is up, push the message back to queue and quit this worker goroutine.
|
||||||
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
|
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
|
||||||
p.requeue(msg)
|
p.requeue(msg)
|
||||||
|
Loading…
Reference in New Issue
Block a user