mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 07:12:17 +08:00
Add RDB.ExtendLease method
This commit is contained in:
parent
dabcb120d5
commit
87dc392c7f
@ -626,6 +626,7 @@ type Broker interface {
|
|||||||
ForwardIfReady(qnames ...string) error
|
ForwardIfReady(qnames ...string) error
|
||||||
DeleteExpiredCompletedTasks(qname string) error
|
DeleteExpiredCompletedTasks(qname string) error
|
||||||
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
|
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||||
|
ExtendLease(qname string, ids ...string) error
|
||||||
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
||||||
ClearServerState(host string, pid int, serverID string) error
|
ClearServerState(host string, pid int, serverID string) error
|
||||||
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
|
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
|
||||||
|
@ -20,7 +20,8 @@ import (
|
|||||||
|
|
||||||
const statsTTL = 90 * 24 * time.Hour // 90 days
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||||
|
|
||||||
const leaseDuration = 30 * time.Second
|
// LeaseDuration is the duration used to initially create a lease and to extend it thereafter.
|
||||||
|
const LeaseDuration = 30 * time.Second
|
||||||
|
|
||||||
// RDB is a client interface to query and mutate task queues.
|
// RDB is a client interface to query and mutate task queues.
|
||||||
type RDB struct {
|
type RDB struct {
|
||||||
@ -253,7 +254,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) {
|
|||||||
base.LeaseKey(qname),
|
base.LeaseKey(qname),
|
||||||
}
|
}
|
||||||
argv := []interface{}{
|
argv := []interface{}{
|
||||||
r.clock.Now().Add(leaseDuration).Unix(),
|
r.clock.Now().Add(LeaseDuration).Unix(),
|
||||||
base.TaskKeyPrefix(qname),
|
base.TaskKeyPrefix(qname),
|
||||||
}
|
}
|
||||||
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||||
@ -957,6 +958,17 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task
|
|||||||
return msgs, nil
|
return msgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExtendLease extends the lease for the given tasks by LeaseDuration (30s).
|
||||||
|
func (r *RDB) ExtendLease(qname string, ids ...string) error {
|
||||||
|
expireAt := r.clock.Now().Add(LeaseDuration)
|
||||||
|
var zs []redis.Z
|
||||||
|
for _, id := range ids {
|
||||||
|
zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())})
|
||||||
|
}
|
||||||
|
// Use XX option to only update elements that already exist; Don't add new elements
|
||||||
|
return r.client.ZAddArgs(context.Background(), base.LeaseKey(qname), redis.ZAddArgs{XX: true, GT: true, Members: zs}).Err()
|
||||||
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
||||||
// KEYS[2] -> asynq:workers:{<host:pid:sid>}
|
// KEYS[2] -> asynq:workers:{<host:pid:sid>}
|
||||||
// ARGV[1] -> TTL in seconds
|
// ARGV[1] -> TTL in seconds
|
||||||
|
@ -360,7 +360,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
wantLease: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -383,7 +383,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
},
|
},
|
||||||
wantLease: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {{Message: t2, Score: now.Add(leaseDuration).Unix()}},
|
"critical": {{Message: t2, Score: now.Add(LeaseDuration).Unix()}},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -406,7 +406,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
wantLease: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
@ -2292,6 +2292,120 @@ func TestListLeaseExpired(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExtendLease(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
now := time.Now()
|
||||||
|
r.SetClock(timeutil.NewSimulatedClock(now))
|
||||||
|
|
||||||
|
t1 := h.NewTaskMessageWithQueue("task1", nil, "default")
|
||||||
|
t2 := h.NewTaskMessageWithQueue("task2", nil, "default")
|
||||||
|
t3 := h.NewTaskMessageWithQueue("task3", nil, "critical")
|
||||||
|
t4 := h.NewTaskMessageWithQueue("task4", nil, "default")
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
lease map[string][]base.Z
|
||||||
|
qname string
|
||||||
|
ids []string
|
||||||
|
wantLease map[string][]base.Z
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "Should extends lease for a single message in a queue",
|
||||||
|
lease: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
ids: []string{t1.ID},
|
||||||
|
wantLease: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Should extends lease for multiple message in a queue",
|
||||||
|
lease: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: now.Add(10 * time.Second).Unix()}, {Message: t2, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
ids: []string{t1.ID, t2.ID},
|
||||||
|
wantLease: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: now.Add(LeaseDuration).Unix()}, {Message: t2, Score: now.Add(LeaseDuration).Unix()}},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Should selectively extends lease for messages in a queue",
|
||||||
|
lease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||||
|
{Message: t2, Score: now.Add(10 * time.Second).Unix()},
|
||||||
|
{Message: t4, Score: now.Add(10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
ids: []string{t2.ID, t4.ID},
|
||||||
|
wantLease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||||
|
{Message: t2, Score: now.Add(LeaseDuration).Unix()},
|
||||||
|
{Message: t4, Score: now.Add(LeaseDuration).Unix()},
|
||||||
|
},
|
||||||
|
"critical": {{Message: t3, Score: now.Add(10 * time.Second).Unix()}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Should not add a new entry in the lease set",
|
||||||
|
lease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
ids: []string{t1.ID, t2.ID},
|
||||||
|
wantLease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(LeaseDuration).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Should not add shorten the lease expiration time",
|
||||||
|
lease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
ids: []string{t1.ID},
|
||||||
|
wantLease: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
{Message: t1, Score: now.Add(LeaseDuration).Add(10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
h.SeedAllLease(t, r.client, tc.lease)
|
||||||
|
|
||||||
|
if err := r.ExtendLease(tc.qname, tc.ids...); err != nil {
|
||||||
|
t.Fatalf("%s: ExtendLease(%q, %v) returned error: %v", tc.desc, tc.qname, tc.ids, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for qname, want := range tc.wantLease {
|
||||||
|
gotLease := h.GetLeaseEntries(t, r.client, qname)
|
||||||
|
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("%s: mismatch found in %q: (-want,+got):\n%s", tc.desc, base.LeaseKey(qname), diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWriteServerState(t *testing.T) {
|
func TestWriteServerState(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
Loading…
Reference in New Issue
Block a user