diff --git a/internal/base/base.go b/internal/base/base.go index 690ee20..eed7f9b 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -626,6 +626,7 @@ type Broker interface { ForwardIfReady(qnames ...string) error DeleteExpiredCompletedTasks(qname string) error ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) + ExtendLease(qname string, ids ...string) error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 36a407e..715576b 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -20,7 +20,8 @@ import ( const statsTTL = 90 * 24 * time.Hour // 90 days -const leaseDuration = 30 * time.Second +// LeaseDuration is the duration used to initially create a lease and to extend it thereafter. +const LeaseDuration = 30 * time.Second // RDB is a client interface to query and mutate task queues. type RDB struct { @@ -253,7 +254,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) { base.LeaseKey(qname), } argv := []interface{}{ - r.clock.Now().Add(leaseDuration).Unix(), + r.clock.Now().Add(LeaseDuration).Unix(), base.TaskKeyPrefix(qname), } res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -957,6 +958,17 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task return msgs, nil } +// ExtendLease extends the lease for the given tasks by LeaseDuration (30s). +func (r *RDB) ExtendLease(qname string, ids ...string) error { + expireAt := r.clock.Now().Add(LeaseDuration) + var zs []redis.Z + for _, id := range ids { + zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())}) + } + // Use XX option to only update elements that already exist; Don't add new elements + return r.client.ZAddArgs(context.Background(), base.LeaseKey(qname), redis.ZAddArgs{XX: true, GT: true, Members: zs}).Err() +} + // KEYS[1] -> asynq:servers:{} // KEYS[2] -> asynq:workers:{} // ARGV[1] -> TTL in seconds diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5f32012..ec6517a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -360,7 +360,7 @@ func TestDequeue(t *testing.T) { "default": {t1}, }, wantLease: map[string][]base.Z{ - "default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}}, + "default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}}, }, }, { @@ -383,7 +383,7 @@ func TestDequeue(t *testing.T) { }, wantLease: map[string][]base.Z{ "default": {}, - "critical": {{Message: t2, Score: now.Add(leaseDuration).Unix()}}, + "critical": {{Message: t2, Score: now.Add(LeaseDuration).Unix()}}, "low": {}, }, }, @@ -406,7 +406,7 @@ func TestDequeue(t *testing.T) { "low": {}, }, wantLease: map[string][]base.Z{ - "default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}}, + "default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}}, "critical": {}, "low": {}, }, @@ -2292,6 +2292,120 @@ func TestListLeaseExpired(t *testing.T) { } } +func TestExtendLease(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) + + t1 := h.NewTaskMessageWithQueue("task1", nil, "default") + t2 := h.NewTaskMessageWithQueue("task2", nil, "default") + t3 := h.NewTaskMessageWithQueue("task3", nil, "critical") + t4 := h.NewTaskMessageWithQueue("task4", nil, "default") + + tests := []struct { + desc string + lease map[string][]base.Z + qname string + ids []string + wantLease map[string][]base.Z + }{ + { + desc: "Should extends lease for a single message in a queue", + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}}, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + qname: "default", + ids: []string{t1.ID}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}}, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + }, + { + desc: "Should extends lease for multiple message in a queue", + lease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}}, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + qname: "default", + ids: []string{t1.ID, t2.ID}, + wantLease: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}, {Message: t2, Score: now.Add(LeaseDuration).Unix()}}, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + }, + { + desc: "Should selectively extends lease for messages in a queue", + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, + {Message: t4, Score: now.Add(10 * time.Second).Unix()}, + }, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + qname: "default", + ids: []string{t2.ID, t4.ID}, + wantLease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(LeaseDuration).Unix()}, + {Message: t4, Score: now.Add(LeaseDuration).Unix()}, + }, + "critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}}, + }, + }, + { + desc: "Should not add a new entry in the lease set", + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + qname: "default", + ids: []string{t1.ID, t2.ID}, + wantLease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(LeaseDuration).Unix()}, + }, + }, + }, + { + desc: "Should not add shorten the lease expiration time", + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()}, + }, + }, + qname: "default", + ids: []string{t1.ID}, + wantLease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()}, + }, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAllLease(t, r.client, tc.lease) + + if err := r.ExtendLease(tc.qname, tc.ids...); err != nil { + t.Fatalf("%s: ExtendLease(%q, %v) returned error: %v", tc.desc, tc.qname, tc.ids, err) + } + + for qname, want := range tc.wantLease { + gotLease := h.GetLeaseEntries(t, r.client, qname) + if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("%s: mismatch found in %q: (-want,+got):\n%s", tc.desc, base.LeaseKey(qname), diff) + } + } + } +} + func TestWriteServerState(t *testing.T) { r := setup(t) defer r.Close()