mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-23 10:16:12 +08:00
Add RDB.ExtendLease method
This commit is contained in:
@@ -360,7 +360,7 @@ func TestDequeue(t *testing.T) {
|
||||
"default": {t1},
|
||||
},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
||||
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -383,7 +383,7 @@ func TestDequeue(t *testing.T) {
|
||||
},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {},
|
||||
"critical": {{Message: t2, Score: now.Add(leaseDuration).Unix()}},
|
||||
"critical": {{Message: t2, Score: now.Add(LeaseDuration).Unix()}},
|
||||
"low": {},
|
||||
},
|
||||
},
|
||||
@@ -406,7 +406,7 @@ func TestDequeue(t *testing.T) {
|
||||
"low": {},
|
||||
},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
||||
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||
"critical": {},
|
||||
"low": {},
|
||||
},
|
||||
@@ -2292,6 +2292,120 @@ func TestListLeaseExpired(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtendLease(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
now := time.Now()
|
||||
r.SetClock(timeutil.NewSimulatedClock(now))
|
||||
|
||||
t1 := h.NewTaskMessageWithQueue("task1", nil, "default")
|
||||
t2 := h.NewTaskMessageWithQueue("task2", nil, "default")
|
||||
t3 := h.NewTaskMessageWithQueue("task3", nil, "critical")
|
||||
t4 := h.NewTaskMessageWithQueue("task4", nil, "default")
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
lease map[string][]base.Z
|
||||
qname string
|
||||
ids []string
|
||||
wantLease map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
desc: "Should extends lease for a single message in a queue",
|
||||
lease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
ids: []string{t1.ID},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Should extends lease for multiple message in a queue",
|
||||
lease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
ids: []string{t1.ID, t2.ID},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}, {Message: t2, Score: now.Add(LeaseDuration).Unix()}},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Should selectively extends lease for messages in a queue",
|
||||
lease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
|
||||
{Message: t4, Score: now.Add(10 * time.Second).Unix()},
|
||||
},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
qname: "default",
|
||||
ids: []string{t2.ID, t4.ID},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||
{Message: t2, Score: now.Add(LeaseDuration).Unix()},
|
||||
{Message: t4, Score: now.Add(LeaseDuration).Unix()},
|
||||
},
|
||||
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Should not add a new entry in the lease set",
|
||||
lease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||
},
|
||||
},
|
||||
qname: "default",
|
||||
ids: []string{t1.ID, t2.ID},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(LeaseDuration).Unix()},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "Should not add shorten the lease expiration time",
|
||||
lease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()},
|
||||
},
|
||||
},
|
||||
qname: "default",
|
||||
ids: []string{t1.ID},
|
||||
wantLease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
h.SeedAllLease(t, r.client, tc.lease)
|
||||
|
||||
if err := r.ExtendLease(tc.qname, tc.ids...); err != nil {
|
||||
t.Fatalf("%s: ExtendLease(%q, %v) returned error: %v", tc.desc, tc.qname, tc.ids, err)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantLease {
|
||||
gotLease := h.GetLeaseEntries(t, r.client, qname)
|
||||
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
|
||||
t.Errorf("%s: mismatch found in %q: (-want,+got):\n%s", tc.desc, base.LeaseKey(qname), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteServerState(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
Reference in New Issue
Block a user