mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-26 11:16:12 +08:00
Add redis pubsub subscriber for cancelation
This commit is contained in:
28
processor.go
28
processor.go
@@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"github.com/rs/xid"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
@@ -53,15 +52,14 @@ type processor struct {
|
||||
// quit channel communicates to the in-flight worker goroutines to stop.
|
||||
quit chan struct{}
|
||||
|
||||
// cancelFuncs is a map of task ID to cancel function for all in-progress tasks.
|
||||
mu sync.Mutex
|
||||
cancelFuncs map[string]context.CancelFunc
|
||||
// cancelations is a set of cancel functions for all in-progress tasks.
|
||||
cancelations *base.Cancelations
|
||||
}
|
||||
|
||||
type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
||||
|
||||
// newProcessor constructs a new processor.
|
||||
func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor {
|
||||
func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest, cancelations *base.Cancelations) *processor {
|
||||
qcfg := normalizeQueueCfg(pinfo.Queues)
|
||||
orderedQueues := []string(nil)
|
||||
if pinfo.StrictPriority {
|
||||
@@ -74,12 +72,12 @@ func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRe
|
||||
orderedQueues: orderedQueues,
|
||||
retryDelayFunc: fn,
|
||||
syncRequestCh: syncRequestCh,
|
||||
cancelations: cancelations,
|
||||
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
|
||||
sema: make(chan struct{}, pinfo.Concurrency),
|
||||
done: make(chan struct{}),
|
||||
abort: make(chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
cancelFuncs: make(map[string]context.CancelFunc),
|
||||
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
||||
}
|
||||
}
|
||||
@@ -107,7 +105,7 @@ func (p *processor) terminate() {
|
||||
logger.info("Waiting for all workers to finish...")
|
||||
|
||||
// send cancellation signal to all in-progress task handlers
|
||||
for _, cancel := range p.cancelFuncs {
|
||||
for _, cancel := range p.cancelations.GetAll() {
|
||||
cancel()
|
||||
}
|
||||
|
||||
@@ -174,10 +172,10 @@ func (p *processor) exec() {
|
||||
resCh := make(chan error, 1)
|
||||
task := NewTask(msg.Type, msg.Payload)
|
||||
ctx, cancel := createContext(msg)
|
||||
p.addCancelFunc(msg.ID, cancel)
|
||||
p.cancelations.Add(msg.ID.String(), cancel)
|
||||
go func() {
|
||||
resCh <- perform(ctx, task, p.handler)
|
||||
p.deleteCancelFunc(msg.ID)
|
||||
p.cancelations.Delete(msg.ID.String())
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -268,18 +266,6 @@ func (p *processor) kill(msg *base.TaskMessage, e error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processor) addCancelFunc(id xid.ID, fn context.CancelFunc) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.cancelFuncs[id.String()] = fn
|
||||
}
|
||||
|
||||
func (p *processor) deleteCancelFunc(id xid.ID) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
delete(p.cancelFuncs, id.String())
|
||||
}
|
||||
|
||||
// queues returns a list of queues to query.
|
||||
// Order of the queue names is based on the priority of each queue.
|
||||
// Queue names is sorted by their priority level if strict-priority is true.
|
||||
|
||||
Reference in New Issue
Block a user