diff --git a/processor.go b/processor.go index 2ad9687..c81c998 100644 --- a/processor.go +++ b/processor.go @@ -7,6 +7,7 @@ package asynq import ( "context" "fmt" + "math" "math/rand" "runtime" "runtime/debug" @@ -19,12 +20,14 @@ import ( asynqcontext "github.com/hibiken/asynq/internal/context" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/log" + "github.com/hibiken/asynq/internal/timeutil" "golang.org/x/time/rate" ) type processor struct { logger *log.Logger broker base.Broker + clock timeutil.Clock handler Handler baseCtxFn func() context.Context @@ -97,6 +100,7 @@ func newProcessor(params processorParams) *processor { logger: params.logger, broker: params.broker, baseCtxFn: params.baseCtxFn, + clock: timeutil.NewRealClock(), queueConfig: queues, orderedQueues: orderedQueues, retryDelayFunc: params.retryDelayFunc, @@ -167,7 +171,7 @@ func (p *processor) exec() { return case p.sema <- struct{}{}: // acquire token qnames := p.queues() - msg, deadline, err := p.broker.Dequeue(qnames...) + msg, err := p.broker.Dequeue(qnames...) switch { case errors.Is(err, errors.ErrNoProcessableTask): p.logger.Debug("All queues are empty") @@ -186,6 +190,7 @@ func (p *processor) exec() { return } + deadline := p.computeDeadline(msg) p.starting <- &workerInfo{msg, time.Now(), deadline} go func() { defer func() { @@ -486,3 +491,19 @@ func gcd(xs ...int) int { } return res } + +// computeDeadline returns the given task's deadline, +func (p *processor) computeDeadline(msg *base.TaskMessage) time.Time { + if msg.Timeout == 0 && msg.Deadline == 0 { + p.logger.Errorf("asynq: internal error: both timeout and deadline are not set for the task message: %s", msg.ID) + return p.clock.Now().Add(defaultTimeout) + } + if msg.Timeout != 0 && msg.Deadline != 0 { + deadlineUnix := math.Min(float64(p.clock.Now().Unix()+msg.Timeout), float64(msg.Deadline)) + return time.Unix(int64(deadlineUnix), 0) + } + if msg.Timeout != 0 { + return p.clock.Now().Add(time.Duration(msg.Timeout) * time.Second) + } + return time.Unix(msg.Deadline, 0) +} diff --git a/processor_test.go b/processor_test.go index 5d3db46..dc3dd06 100644 --- a/processor_test.go +++ b/processor_test.go @@ -17,7 +17,9 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/timeutil" ) var taskCmpOpts = []cmp.Option{ @@ -754,3 +756,69 @@ func TestNormalizeQueues(t *testing.T) { } } } + +func TestProcessorComputeDeadline(t *testing.T) { + now := time.Now() + p := processor{ + logger: log.NewLogger(nil), + clock: timeutil.NewSimulatedClock(now), + } + + tests := []struct { + desc string + msg *base.TaskMessage + want time.Time + }{ + { + desc: "message with only timeout specified", + msg: &base.TaskMessage{ + Timeout: int64((30 * time.Minute).Seconds()), + }, + want: now.Add(30 * time.Minute), + }, + { + desc: "message with only deadline specified", + msg: &base.TaskMessage{ + Deadline: now.Add(24 * time.Hour).Unix(), + }, + want: now.Add(24 * time.Hour), + }, + { + desc: "message with both timeout and deadline set (now+timeout < deadline)", + msg: &base.TaskMessage{ + Deadline: now.Add(24 * time.Hour).Unix(), + Timeout: int64((30 * time.Minute).Seconds()), + }, + want: now.Add(30 * time.Minute), + }, + { + desc: "message with both timeout and deadline set (now+timeout > deadline)", + msg: &base.TaskMessage{ + Deadline: now.Add(10 * time.Minute).Unix(), + Timeout: int64((30 * time.Minute).Seconds()), + }, + want: now.Add(10 * time.Minute), + }, + { + desc: "message with both timeout and deadline set (now+timeout == deadline)", + msg: &base.TaskMessage{ + Deadline: now.Add(30 * time.Minute).Unix(), + Timeout: int64((30 * time.Minute).Seconds()), + }, + want: now.Add(30 * time.Minute), + }, + { + desc: "message without timeout and deadline", + msg: &base.TaskMessage{}, + want: now.Add(defaultTimeout), + }, + } + + for _, tc := range tests { + got := p.computeDeadline(tc.msg) + // Compare the Unix epoch with seconds granularity + if got.Unix() != tc.want.Unix() { + t.Errorf("%s: got=%v, want=%v", tc.desc, got.Unix(), tc.want.Unix()) + } + } +}