2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

fix: stop active tasks before server shutdown

This commit is contained in:
kanzihuang 2024-05-01 23:34:03 +08:00 committed by wanli
parent 100bb89877
commit 86c772f6c1
4 changed files with 42 additions and 23 deletions

View File

@ -2,7 +2,7 @@ package timeutil
import ( import (
"context" "context"
"github.com/stretchr/testify/require" "github.com/hibiken/asynq/internal/errors"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -37,7 +37,9 @@ func TestSleep(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
err := Sleep(ctx, tc.sleep) err := Sleep(ctx, tc.sleep)
require.ErrorIs(t, tc.wantErr, err) if !errors.Is(err, tc.wantErr) {
t.Errorf("timeutil.Sleep: got %v, want %v", err, tc.wantErr)
}
}() }()
time.Sleep(20 * time.Millisecond) time.Sleep(20 * time.Millisecond)
cancel() cancel()

View File

@ -62,6 +62,9 @@ type processor struct {
// quit channel is closed when the shutdown of the "processor" goroutine starts. // quit channel is closed when the shutdown of the "processor" goroutine starts.
quit chan struct{} quit chan struct{}
// terminate channel is closed when the shutdown of the "processor" goroutine starts.
terminate chan struct{}
// abort channel communicates to the in-flight worker goroutines to stop. // abort channel communicates to the in-flight worker goroutines to stop.
abort chan struct{} abort chan struct{}
@ -113,6 +116,7 @@ func newProcessor(params processorParams) *processor {
sema: make(chan struct{}, params.concurrency), sema: make(chan struct{}, params.concurrency),
done: make(chan struct{}), done: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
terminate: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
errHandler: params.errHandler, errHandler: params.errHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
@ -139,6 +143,7 @@ func (p *processor) stop() {
func (p *processor) shutdown() { func (p *processor) shutdown() {
p.stop() p.stop()
close(p.terminate)
time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) }) time.AfterFunc(p.shutdownTimeout, func() { close(p.abort) })
p.logger.Info("Waiting for all workers to finish...") p.logger.Info("Waiting for all workers to finish...")
@ -232,25 +237,38 @@ func (p *processor) exec() {
resCh <- p.perform(ctx, task) resCh <- p.perform(ctx, task)
}() }()
select { var leaseDone, terminated bool
case <-p.abort: for {
// time is up, push the message back to queue and quit this worker goroutine. select {
p.logger.Warnf("Quitting worker. task id=%s", msg.ID) case <-p.terminate:
p.requeue(lease, msg) cancel()
return case <-lease.Done():
case <-lease.Done(): leaseDone = true
cancel() cancel()
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired) case <-p.abort:
return // time is up, push the message back to queue and quit this worker goroutine.
case <-ctx.Done(): p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.handleFailedMessage(ctx, lease, msg, ctx.Err()) p.requeue(lease, msg)
return return
case resErr := <-resCh: case resErr := <-resCh:
if resErr != nil { switch {
p.handleFailedMessage(ctx, lease, msg, resErr) case resErr == nil:
p.handleSucceededMessage(lease, msg)
case errors.Is(resErr, context.Canceled):
switch {
case leaseDone:
p.handleFailedMessage(ctx, lease, msg, ErrLeaseExpired)
case terminated:
p.logger.Warnf("Quitting worker. task id=%s", msg.ID)
p.requeue(lease, msg)
default:
p.handleFailedMessage(ctx, lease, msg, resErr)
}
default:
p.handleFailedMessage(ctx, lease, msg, resErr)
}
return return
} }
p.handleSucceededMessage(lease, msg)
} }
}() }()
} }

View File

@ -505,8 +505,7 @@ func TestProcessorWithExpiredLease(t *testing.T) {
handler: HandlerFunc(func(ctx context.Context, task *Task) error { handler: HandlerFunc(func(ctx context.Context, task *Task) error {
// make sure the task processing time exceeds lease duration // make sure the task processing time exceeds lease duration
// to test expired lease. // to test expired lease.
time.Sleep(rdb.LeaseDuration + 10*time.Second) return timeutil.Sleep(ctx, rdb.LeaseDuration+10*time.Second)
return nil
}), }),
wantErrCount: 1, // ErrorHandler should still be called with ErrLeaseExpired wantErrCount: 1, // ErrorHandler should still be called with ErrLeaseExpired
}, },

View File

@ -7,8 +7,6 @@ package asynq
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"syscall" "syscall"
"testing" "testing"
"time" "time"
@ -16,6 +14,8 @@ import (
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker" "github.com/hibiken/asynq/internal/testbroker"
"github.com/hibiken/asynq/internal/testutil" "github.com/hibiken/asynq/internal/testutil"
"github.com/hibiken/asynq/internal/timeutil"
"github.com/redis/go-redis/v9"
"go.uber.org/goleak" "go.uber.org/goleak"
) )