mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-15 20:08:46 +08:00
Add deadline to syncRequest
- syncer will drop a request if its deadline has been exceeded
This commit is contained in:
parent
65e17a3469
commit
ef4a4a8334
39
processor.go
39
processor.go
@ -208,7 +208,7 @@ func (p *processor) exec() {
|
|||||||
return
|
return
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
|
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
|
||||||
p.retryOrKill(msg, ctx.Err())
|
p.retryOrKill(ctx, msg, ctx.Err())
|
||||||
return
|
return
|
||||||
case resErr := <-resCh:
|
case resErr := <-resCh:
|
||||||
// Note: One of three things should happen.
|
// Note: One of three things should happen.
|
||||||
@ -219,10 +219,10 @@ func (p *processor) exec() {
|
|||||||
if p.errHandler != nil {
|
if p.errHandler != nil {
|
||||||
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
|
p.errHandler.HandleError(task, resErr, msg.Retried, msg.Retry)
|
||||||
}
|
}
|
||||||
p.retryOrKill(msg, resErr)
|
p.retryOrKill(ctx, msg, resErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.markAsDone(msg)
|
p.markAsDone(ctx, msg)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -237,55 +237,70 @@ func (p *processor) requeue(msg *base.TaskMessage) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) markAsDone(msg *base.TaskMessage) {
|
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
||||||
err := p.broker.Done(msg)
|
err := p.broker.Done(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err)
|
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err)
|
||||||
|
deadline, ok := ctx.Deadline()
|
||||||
|
if !ok {
|
||||||
|
panic("asynq: internal error: missing deadline in context")
|
||||||
|
}
|
||||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.broker.Done(msg)
|
return p.broker.Done(msg)
|
||||||
},
|
},
|
||||||
errMsg: errMsg,
|
errMsg: errMsg,
|
||||||
|
deadline: deadline,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) retryOrKill(msg *base.TaskMessage, err error) {
|
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
|
||||||
if msg.Retried >= msg.Retry {
|
if msg.Retried >= msg.Retry {
|
||||||
p.kill(msg, err)
|
p.kill(ctx, msg, err)
|
||||||
} else {
|
} else {
|
||||||
p.retry(msg, err)
|
p.retry(ctx, msg, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) retry(msg *base.TaskMessage, e error) {
|
func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||||
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
|
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
|
||||||
retryAt := time.Now().Add(d)
|
retryAt := time.Now().Add(d)
|
||||||
err := p.broker.Retry(msg, retryAt, e.Error())
|
err := p.broker.Retry(msg, retryAt, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
||||||
|
deadline, ok := ctx.Deadline()
|
||||||
|
if !ok {
|
||||||
|
panic("asynq: internal error: missing deadline in context")
|
||||||
|
}
|
||||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.broker.Retry(msg, retryAt, e.Error())
|
return p.broker.Retry(msg, retryAt, e.Error())
|
||||||
},
|
},
|
||||||
errMsg: errMsg,
|
errMsg: errMsg,
|
||||||
|
deadline: deadline,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) kill(msg *base.TaskMessage, e error) {
|
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||||
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
|
||||||
err := p.broker.Kill(msg, e.Error())
|
err := p.broker.Kill(msg, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
||||||
|
deadline, ok := ctx.Deadline()
|
||||||
|
if !ok {
|
||||||
|
panic("asynq: internal error: missing deadline in context")
|
||||||
|
}
|
||||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||||
p.syncRequestCh <- &syncRequest{
|
p.syncRequestCh <- &syncRequest{
|
||||||
fn: func() error {
|
fn: func() error {
|
||||||
return p.broker.Kill(msg, e.Error())
|
return p.broker.Kill(msg, e.Error())
|
||||||
},
|
},
|
||||||
errMsg: errMsg,
|
errMsg: errMsg,
|
||||||
|
deadline: deadline,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,8 +26,9 @@ type syncer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type syncRequest struct {
|
type syncRequest struct {
|
||||||
fn func() error // sync operation
|
fn func() error // sync operation
|
||||||
errMsg string // error message
|
errMsg string // error message
|
||||||
|
deadline time.Time // request should be dropped if deadline has been exceeded
|
||||||
}
|
}
|
||||||
|
|
||||||
type syncerParams struct {
|
type syncerParams struct {
|
||||||
@ -72,6 +73,9 @@ func (s *syncer) start(wg *sync.WaitGroup) {
|
|||||||
case <-time.After(s.interval):
|
case <-time.After(s.interval):
|
||||||
var temp []*syncRequest
|
var temp []*syncRequest
|
||||||
for _, req := range requests {
|
for _, req := range requests {
|
||||||
|
if req.deadline.Before(time.Now()) {
|
||||||
|
continue // drop stale request
|
||||||
|
}
|
||||||
if err := req.fn(); err != nil {
|
if err := req.fn(); err != nil {
|
||||||
temp = append(temp, req)
|
temp = append(temp, req)
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@ func TestSyncer(t *testing.T) {
|
|||||||
fn: func() error {
|
fn: func() error {
|
||||||
return rdbClient.Done(m)
|
return rdbClient.Done(m)
|
||||||
},
|
},
|
||||||
|
deadline: time.Now().Add(5 * time.Minute),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,8 +86,9 @@ func TestSyncerRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
syncRequestCh <- &syncRequest{
|
syncRequestCh <- &syncRequest{
|
||||||
fn: requestFunc,
|
fn: requestFunc,
|
||||||
errMsg: "error",
|
errMsg: "error",
|
||||||
|
deadline: time.Now().Add(5 * time.Minute),
|
||||||
}
|
}
|
||||||
|
|
||||||
// allow syncer to retry
|
// allow syncer to retry
|
||||||
@ -98,3 +100,41 @@ func TestSyncerRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncerDropsStaleRequests(t *testing.T) {
|
||||||
|
const interval = time.Second
|
||||||
|
syncRequestCh := make(chan *syncRequest)
|
||||||
|
syncer := newSyncer(syncerParams{
|
||||||
|
logger: testLogger,
|
||||||
|
requestsCh: syncRequestCh,
|
||||||
|
interval: interval,
|
||||||
|
})
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
syncer.start(&wg)
|
||||||
|
|
||||||
|
var (
|
||||||
|
mu sync.Mutex
|
||||||
|
n int // number of times request has been processed
|
||||||
|
)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
syncRequestCh <- &syncRequest{
|
||||||
|
fn: func() error {
|
||||||
|
mu.Lock()
|
||||||
|
n++
|
||||||
|
mu.Unlock()
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
deadline: time.Now().Add(time.Duration(-i) * time.Second), // already exceeded deadline
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||||
|
syncer.terminate()
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
if n != 0 {
|
||||||
|
t.Errorf("requests has been processed %d times, want 0", n)
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user