mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add Lease type to base package
This commit is contained in:
parent
871474f220
commit
b9943de2ab
@ -18,6 +18,7 @@ import (
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
pb "github.com/hibiken/asynq/internal/proto"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
@ -608,6 +609,73 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
|
||||
return fn, ok
|
||||
}
|
||||
|
||||
// Lease is a time bound lease for worker to process task.
|
||||
// It provides a communication channel between lessor and lessee about lease expiration.
|
||||
type Lease struct {
|
||||
once sync.Once
|
||||
ch chan struct{}
|
||||
|
||||
clock timeutil.Clock
|
||||
|
||||
mu sync.Mutex
|
||||
expireAt time.Time // guarded by mu
|
||||
}
|
||||
|
||||
func NewLease(expirationTime time.Time) *Lease {
|
||||
return &Lease{
|
||||
ch: make(chan struct{}),
|
||||
expireAt: expirationTime,
|
||||
clock: timeutil.NewRealClock(),
|
||||
}
|
||||
}
|
||||
|
||||
// Reset chanegs the lease to expire at the given time.
|
||||
// It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired.
|
||||
func (l *Lease) Reset(expirationTime time.Time) bool {
|
||||
if !l.IsValid() {
|
||||
return false
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.expireAt = expirationTime
|
||||
return true
|
||||
}
|
||||
|
||||
// Sends a notification to lessee about expired lease
|
||||
// Returns true if notification was sent, returns false if the lease is still valid and notification was not sent.
|
||||
func (l *Lease) NotifyExpiration() bool {
|
||||
if l.IsValid() {
|
||||
return false
|
||||
}
|
||||
l.once.Do(l.closeCh)
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *Lease) closeCh() {
|
||||
close(l.ch)
|
||||
}
|
||||
|
||||
// Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration.
|
||||
func (l *Lease) Done() <-chan struct{} {
|
||||
return l.ch
|
||||
}
|
||||
|
||||
// Deadline returns the expiration time of the lease.
|
||||
func (l *Lease) Deadline() time.Time {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.expireAt
|
||||
}
|
||||
|
||||
// IsValid returns true if the lease's expieration time is in the future or equals to the current time,
|
||||
// returns false otherwise.
|
||||
func (l *Lease) IsValid() bool {
|
||||
now := l.clock.Now()
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return l.expireAt.After(now) || l.expireAt.Equal(now)
|
||||
}
|
||||
|
||||
// Broker is a message broker that supports operations to manage task queues.
|
||||
//
|
||||
// See rdb.RDB as a reference implementation.
|
||||
@ -615,7 +683,7 @@ type Broker interface {
|
||||
Ping() error
|
||||
Enqueue(ctx context.Context, msg *TaskMessage) error
|
||||
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
||||
Dequeue(qnames ...string) (*TaskMessage, error)
|
||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
||||
Done(msg *TaskMessage) error
|
||||
MarkAsComplete(msg *TaskMessage) error
|
||||
Requeue(msg *TaskMessage) error
|
||||
@ -626,7 +694,7 @@ type Broker interface {
|
||||
ForwardIfReady(qnames ...string) error
|
||||
DeleteExpiredCompletedTasks(qname string) error
|
||||
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
|
||||
ExtendLease(qname string, ids ...string) error
|
||||
ExtendLease(qname string, ids ...string) (time.Time, error)
|
||||
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
||||
ClearServerState(host string, pid int, serverID string) error
|
||||
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
)
|
||||
|
||||
func TestTaskKey(t *testing.T) {
|
||||
@ -644,3 +645,75 @@ func TestCancelationsConcurrentAccess(t *testing.T) {
|
||||
t.Errorf("(*Cancelations).Get(%q) = _, true, want <nil>, false", key2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseReset(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock := timeutil.NewSimulatedClock(now)
|
||||
|
||||
l := NewLease(now.Add(30 * time.Second))
|
||||
l.clock = clock
|
||||
|
||||
// Check initial state
|
||||
if !l.IsValid() {
|
||||
t.Errorf("lease should be valid when expiration is set to a future time")
|
||||
}
|
||||
if want := now.Add(30 * time.Second); l.Deadline() != want {
|
||||
t.Errorf("Lease.Deadline() = %v, want %v", l.Deadline(), want)
|
||||
}
|
||||
|
||||
// Test Reset
|
||||
if !l.Reset(now.Add(45 * time.Second)) {
|
||||
t.Fatalf("Lease.Reset returned false when extending")
|
||||
}
|
||||
if want := now.Add(45 * time.Second); l.Deadline() != want {
|
||||
t.Errorf("After Reset: Lease.Deadline() = %v, want %v", l.Deadline(), want)
|
||||
}
|
||||
|
||||
clock.AdvanceTime(1 * time.Minute) // simulate lease expiration
|
||||
|
||||
if l.IsValid() {
|
||||
t.Errorf("lease should be invalid after expiration")
|
||||
}
|
||||
|
||||
// Reset should return false if lease is expired.
|
||||
if l.Reset(time.Now().Add(20 * time.Second)) {
|
||||
t.Errorf("Lease.Reset should return false after expiration")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseNotifyExpiration(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock := timeutil.NewSimulatedClock(now)
|
||||
|
||||
l := NewLease(now.Add(30 * time.Second))
|
||||
l.clock = clock
|
||||
|
||||
select {
|
||||
case <-l.Done():
|
||||
t.Fatalf("Lease.Done() did not block")
|
||||
default:
|
||||
}
|
||||
|
||||
if l.NotifyExpiration() {
|
||||
t.Fatalf("Lease.NotifyExpiration() should return false when lease is still valid")
|
||||
}
|
||||
|
||||
clock.AdvanceTime(1 * time.Minute) // simulate lease expiration
|
||||
|
||||
if l.IsValid() {
|
||||
t.Errorf("Lease should be invalid after expiration")
|
||||
}
|
||||
if !l.NotifyExpiration() {
|
||||
t.Errorf("Lease.NotifyExpiration() return return true after expiration")
|
||||
}
|
||||
if !l.NotifyExpiration() {
|
||||
t.Errorf("It should be leagal to call Lease.NotifyExpiration multiple times")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-l.Done():
|
||||
// expected
|
||||
default:
|
||||
t.Errorf("Lease.Done() blocked after call to Lease.NotifyExpiration()")
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user