2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Update heartbeat to extend lease of active workers

This commit is contained in:
Ken Hibino
2022-02-14 07:17:51 -08:00
parent 53d8d0554a
commit a5d0206f33
8 changed files with 148 additions and 53 deletions

View File

@@ -18,6 +18,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/timeutil"
)
// EquateInt64Approx returns a Comparer option that treats int64 values
@@ -114,6 +115,13 @@ func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *bas
}
}
// NewLeaseWithClock returns a new lease with the given expiration time and clock.
func NewLeaseWithClock(expirationTime time.Time, clock timeutil.Clock) *base.Lease {
l := base.NewLease(expirationTime)
l.Clock = clock
return l
}
// JSON serializes the given key-value pairs into stream of bytes in JSON.
func JSON(kv map[string]interface{}) []byte {
b, err := json.Marshal(kv)

View File

@@ -615,7 +615,7 @@ type Lease struct {
once sync.Once
ch chan struct{}
clock timeutil.Clock
Clock timeutil.Clock
mu sync.Mutex
expireAt time.Time // guarded by mu
@@ -625,7 +625,7 @@ func NewLease(expirationTime time.Time) *Lease {
return &Lease{
ch: make(chan struct{}),
expireAt: expirationTime,
clock: timeutil.NewRealClock(),
Clock: timeutil.NewRealClock(),
}
}
@@ -670,7 +670,7 @@ func (l *Lease) Deadline() time.Time {
// IsValid returns true if the lease's expieration time is in the future or equals to the current time,
// returns false otherwise.
func (l *Lease) IsValid() bool {
now := l.clock.Now()
now := l.Clock.Now()
l.mu.Lock()
defer l.mu.Unlock()
return l.expireAt.After(now) || l.expireAt.Equal(now)

View File

@@ -651,7 +651,7 @@ func TestLeaseReset(t *testing.T) {
clock := timeutil.NewSimulatedClock(now)
l := NewLease(now.Add(30 * time.Second))
l.clock = clock
l.Clock = clock
// Check initial state
if !l.IsValid() {
@@ -686,7 +686,7 @@ func TestLeaseNotifyExpiration(t *testing.T) {
clock := timeutil.NewSimulatedClock(now)
l := NewLease(now.Add(30 * time.Second))
l.clock = clock
l.Clock = clock
select {
case <-l.Done():

View File

@@ -64,40 +64,40 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
return tb.real.EnqueueUnique(ctx, msg, ttl)
}
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) {
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
return nil, time.Time{}, errRedisDown
}
return tb.real.Dequeue(qnames...)
}
func (tb *TestBroker) Done(msg *base.TaskMessage) error {
func (tb *TestBroker) Done(ctx context.Context, msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Done(msg)
return tb.real.Done(ctx, msg)
}
func (tb *TestBroker) MarkAsComplete(msg *base.TaskMessage) error {
func (tb *TestBroker) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.MarkAsComplete(msg)
return tb.real.MarkAsComplete(ctx, msg)
}
func (tb *TestBroker) Requeue(msg *base.TaskMessage) error {
func (tb *TestBroker) Requeue(ctx context.Context, msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Requeue(msg)
return tb.real.Requeue(ctx, msg)
}
func (tb *TestBroker) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error {
@@ -118,22 +118,22 @@ func (tb *TestBroker) ScheduleUnique(ctx context.Context, msg *base.TaskMessage,
return tb.real.ScheduleUnique(ctx, msg, processAt, ttl)
}
func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error {
func (tb *TestBroker) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Retry(msg, processAt, errMsg, isFailure)
return tb.real.Retry(ctx, msg, processAt, errMsg, isFailure)
}
func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error {
func (tb *TestBroker) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Archive(msg, errMsg)
return tb.real.Archive(ctx, msg, errMsg)
}
func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
@@ -163,11 +163,11 @@ func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*b
return tb.real.ListLeaseExpired(cutoff, qnames...)
}
func (tb *TestBroker) ExtendLease(qname string, ids ...string) error {
func (tb *TestBroker) ExtendLease(qname string, ids ...string) (time.Time, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
return time.Time{}, errRedisDown
}
return tb.real.ExtendLease(qname, ids...)
}