2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update processor to create a lease and watch for expiration

This commit is contained in:
Ken Hibino 2022-02-15 06:58:54 -08:00
parent d7169cd445
commit 8211167de2
2 changed files with 160 additions and 48 deletions

View File

@ -171,7 +171,7 @@ func (p *processor) exec() {
return return
case p.sema <- struct{}{}: // acquire token case p.sema <- struct{}{}: // acquire token
qnames := p.queues() qnames := p.queues()
msg, err := p.broker.Dequeue(qnames...) msg, leaseExpirationTime, err := p.broker.Dequeue(qnames...)
switch { switch {
case errors.Is(err, errors.ErrNoProcessableTask): case errors.Is(err, errors.ErrNoProcessableTask):
p.logger.Debug("All queues are empty") p.logger.Debug("All queues are empty")
@ -190,8 +190,9 @@ func (p *processor) exec() {
return return
} }
lease := base.NewLease(leaseExpirationTime)
deadline := p.computeDeadline(msg) deadline := p.computeDeadline(msg)
p.starting <- &workerInfo{msg, time.Now(), deadline} p.starting <- &workerInfo{msg, time.Now(), deadline, lease}
go func() { go func() {
defer func() { defer func() {
p.finished <- msg p.finished <- msg
@ -209,7 +210,7 @@ func (p *processor) exec() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// already canceled (e.g. deadline exceeded). // already canceled (e.g. deadline exceeded).
p.handleFailedMessage(ctx, msg, ctx.Err()) p.handleFailedMessage(ctx, lease, msg, ctx.Err())
return return
default: default:
} }
@ -233,24 +234,33 @@ func (p *processor) exec() {
case <-p.abort: case <-p.abort:
// time is up, push the message back to queue and quit this worker goroutine. // time is up, push the message back to queue and quit this worker goroutine.
p.logger.Warnf("Quitting worker. task id=%s", msg.ID) p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(msg) p.requeue(lease, msg)
return
case <-lease.Done():
cancel()
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
return return
case <-ctx.Done(): case <-ctx.Done():
p.handleFailedMessage(ctx, msg, ctx.Err()) p.handleFailedMessage(ctx, lease, msg, ctx.Err())
return return
case resErr := <-resCh: case resErr := <-resCh:
if resErr != nil { if resErr != nil {
p.handleFailedMessage(ctx, msg, resErr) p.handleFailedMessage(ctx, lease, msg, resErr)
return return
} }
p.handleSucceededMessage(ctx, msg) p.handleSucceededMessage(lease, msg)
} }
}() }()
} }
} }
func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) requeue(l *base.Lease, msg *base.TaskMessage) {
err := p.broker.Requeue(msg) if !l.IsValid() {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Requeue(ctx, msg)
if err != nil { if err != nil {
p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err) p.logger.Errorf("Could not push task id=%s back to queue: %v", msg.ID, err)
} else { } else {
@ -258,49 +268,51 @@ func (p *processor) requeue(msg *base.TaskMessage) {
} }
} }
func (p *processor) handleSucceededMessage(ctx context.Context, msg *base.TaskMessage) { func (p *processor) handleSucceededMessage(l *base.Lease, msg *base.TaskMessage) {
if msg.Retention > 0 { if msg.Retention > 0 {
p.markAsComplete(ctx, msg) p.markAsComplete(l, msg)
} else { } else {
p.markAsDone(ctx, msg) p.markAsDone(l, msg)
} }
} }
func (p *processor) markAsComplete(ctx context.Context, msg *base.TaskMessage) { func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
err := p.broker.MarkAsComplete(msg) if !l.IsValid() {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.MarkAsComplete(ctx, msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v", errMsg := fmt.Sprintf("Could not move task id=%s type=%q from %q to %q: %+v",
msg.ID, msg.Type, base.ActiveKey(msg.Queue), base.CompletedKey(msg.Queue), err) msg.ID, msg.Type, base.ActiveKey(msg.Queue), base.CompletedKey(msg.Queue), err)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.MarkAsComplete(msg) return p.broker.MarkAsComplete(ctx, msg)
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline, deadline: l.Deadline(),
} }
} }
} }
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
err := p.broker.Done(msg) if !l.IsValid() {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Done(ctx, msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err) errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Done(msg) return p.broker.Done(ctx, msg)
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline, deadline: l.Deadline(),
} }
} }
} }
@ -309,59 +321,61 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
// the task should not be retried and should be archived instead. // the task should not be retried and should be archived instead.
var SkipRetry = errors.New("skip retry for the task") var SkipRetry = errors.New("skip retry for the task")
func (p *processor) handleFailedMessage(ctx context.Context, msg *base.TaskMessage, err error) { func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
} }
if !p.isFailureFunc(err) { if !p.isFailureFunc(err) {
// retry the task without marking it as failed // retry the task without marking it as failed
p.retry(ctx, msg, err, false /*isFailure*/) p.retry(l, msg, err, false /*isFailure*/)
return return
} }
if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.archive(ctx, msg, err) p.archive(l, msg, err)
} else { } else {
p.retry(ctx, msg, err, true /*isFailure*/) p.retry(l, msg, err, true /*isFailure*/)
} }
} }
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error, isFailure bool) { func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailure bool) {
if !l.IsValid() {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.broker.Retry(msg, retryAt, e.Error(), isFailure) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue)) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Retry(msg, retryAt, e.Error(), isFailure) return p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline, deadline: l.Deadline(),
} }
} }
} }
func (p *processor) archive(ctx context.Context, msg *base.TaskMessage, e error) { func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
err := p.broker.Archive(msg, e.Error()) if !l.IsValid() {
// If lease is not valid, do not write to redis; Let recoverer take care of it.
return
}
ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
err := p.broker.Archive(ctx, msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue)) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
}
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Archive(msg, e.Error()) return p.broker.Archive(ctx, msg, e.Error())
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: deadline, deadline: l.Deadline(),
} }
} }
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/timeutil" "github.com/hibiken/asynq/internal/timeutil"
@ -483,6 +484,103 @@ func TestProcessorMarkAsComplete(t *testing.T) {
} }
} }
// Test a scenario where the worker server cannot communicate with redis due to a network failure
// and the lease expires
func TestProcessorWithExpiredLease(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("task1", nil)
tests := []struct {
pending []*base.TaskMessage
handler Handler
wantErrCount int
}{
{
pending: []*base.TaskMessage{m1},
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
// make sure the task processing time exceeds lease duration
// to test expired lease.
time.Sleep(rdb.LeaseDuration + 10*time.Second)
return nil
}),
wantErrCount: 1, // ErrorHandler should still be called with ErrLeaseExpired
},
}
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedPendingQueue(t, r, tc.pending, base.DefaultQueueName)
starting := make(chan *workerInfo)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
t.Cleanup(func() { close(done) })
// fake heartbeater which notifies lease expiration
go func() {
for {
select {
case w := <-starting:
// simulate expiration by resetting to some time in the past
w.lease.Reset(time.Now().Add(-5 * time.Second))
if !w.lease.NotifyExpiration() {
panic("Failed to notifiy lease expiration")
}
case <-finished:
// do nothing
case <-done:
return
}
}
}()
go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{
logger: testLogger,
broker: rdbClient,
retryDelayFunc: DefaultRetryDelayFunc,
isFailureFunc: defaultIsFailureFunc,
syncCh: syncCh,
cancelations: base.NewCancelations(),
concurrency: 10,
queues: defaultQueueConfig,
strictPriority: false,
errHandler: nil,
shutdownTimeout: defaultShutdownTimeout,
starting: starting,
finished: finished,
})
p.handler = tc.handler
var (
mu sync.Mutex // guards n and errs
n int // number of times error handler is called
errs []error // error passed to error handler
)
p.errHandler = ErrorHandlerFunc(func(ctx context.Context, t *Task, err error) {
mu.Lock()
defer mu.Unlock()
n++
errs = append(errs, err)
})
p.start(&sync.WaitGroup{})
time.Sleep(4 * time.Second)
p.shutdown()
if n != tc.wantErrCount {
t.Errorf("Unexpected number of error count: got %d, want %d", n, tc.wantErrCount)
continue
}
for i := 0; i < tc.wantErrCount; i++ {
if !errors.Is(errs[i], ErrLeaseExpired) {
t.Errorf("Unexpected error was passed to ErrorHandler: got %v want %v", errs[i], ErrLeaseExpired)
}
}
}
}
func TestProcessorQueues(t *testing.T) { func TestProcessorQueues(t *testing.T) {
sortOpt := cmp.Transformer("SortStrings", func(in []string) []string { sortOpt := cmp.Transformer("SortStrings", func(in []string) []string {
out := append([]string(nil), in...) // Copy input to avoid mutating it out := append([]string(nil), in...) // Copy input to avoid mutating it