2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-23 10:16:12 +08:00

Update heartbeat goroutine to call ExtendLease on active tasks

This commit is contained in:
Ken Hibino
2022-02-12 09:48:07 -08:00
parent 6974812027
commit 92eba47853
6 changed files with 215 additions and 26 deletions

View File

@@ -295,9 +295,9 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
}
// SeedAllLease initializes all of the lease sets with the given entries.
func SeedAllLease(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) {
func SeedAllLease(tb testing.TB, r redis.UniversalClient, lease map[string][]base.Z) {
tb.Helper()
for q, entries := range deadlines {
for q, entries := range lease {
SeedLease(tb, r, entries, q)
}
}

View File

@@ -163,6 +163,15 @@ func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*b
return tb.real.ListLeaseExpired(cutoff, qnames...)
}
func (tb *TestBroker) ExtendLease(qname string, ids ...string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.ExtendLease(qname, ids...)
}
func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
tb.mu.Lock()
defer tb.mu.Unlock()

View File

@@ -1,3 +1,7 @@
// Copyright 2022 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
// Package timeutil exports functions and types related to time and date.
package timeutil
@@ -35,4 +39,4 @@ func (c *SimulatedClock) Now() time.Time { return c.t }
func (c *SimulatedClock) SetTime(t time.Time) { c.t = t }
func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) }
func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t = c.t.Add(d) }

View File

@@ -0,0 +1,48 @@
// Copyright 2022 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package timeutil
import (
"testing"
"time"
)
func TestSimulatedClock(t *testing.T) {
now := time.Now()
tests := []struct {
desc string
initTime time.Time
advanceBy time.Duration
wantTime time.Time
}{
{
desc: "advance time forward",
initTime: now,
advanceBy: 30 * time.Second,
wantTime: now.Add(30 * time.Second),
},
{
desc: "advance time backward",
initTime: now,
advanceBy: -10 * time.Second,
wantTime: now.Add(-10 * time.Second),
},
}
for _, tc := range tests {
c := NewSimulatedClock(tc.initTime)
if c.Now() != tc.initTime {
t.Errorf("%s: Before Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.initTime)
}
c.AdvanceTime(tc.advanceBy)
if c.Now() != tc.wantTime {
t.Errorf("%s: After Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.wantTime)
}
}
}