diff --git a/internal/base/base.go b/internal/base/base.go index eed7f9b..615ed75 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -18,6 +18,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/hibiken/asynq/internal/errors" pb "github.com/hibiken/asynq/internal/proto" + "github.com/hibiken/asynq/internal/timeutil" "google.golang.org/protobuf/proto" ) @@ -608,6 +609,73 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) { return fn, ok } +// Lease is a time bound lease for worker to process task. +// It provides a communication channel between lessor and lessee about lease expiration. +type Lease struct { + once sync.Once + ch chan struct{} + + clock timeutil.Clock + + mu sync.Mutex + expireAt time.Time // guarded by mu +} + +func NewLease(expirationTime time.Time) *Lease { + return &Lease{ + ch: make(chan struct{}), + expireAt: expirationTime, + clock: timeutil.NewRealClock(), + } +} + +// Reset chanegs the lease to expire at the given time. +// It returns true if the lease is still valid and reset operation was successful, false if the lease had been expired. +func (l *Lease) Reset(expirationTime time.Time) bool { + if !l.IsValid() { + return false + } + l.mu.Lock() + defer l.mu.Unlock() + l.expireAt = expirationTime + return true +} + +// Sends a notification to lessee about expired lease +// Returns true if notification was sent, returns false if the lease is still valid and notification was not sent. +func (l *Lease) NotifyExpiration() bool { + if l.IsValid() { + return false + } + l.once.Do(l.closeCh) + return true +} + +func (l *Lease) closeCh() { + close(l.ch) +} + +// Done returns a communication channel from which the lessee can read to get notified when lessor notifies about lease expiration. +func (l *Lease) Done() <-chan struct{} { + return l.ch +} + +// Deadline returns the expiration time of the lease. +func (l *Lease) Deadline() time.Time { + l.mu.Lock() + defer l.mu.Unlock() + return l.expireAt +} + +// IsValid returns true if the lease's expieration time is in the future or equals to the current time, +// returns false otherwise. +func (l *Lease) IsValid() bool { + now := l.clock.Now() + l.mu.Lock() + defer l.mu.Unlock() + return l.expireAt.After(now) || l.expireAt.Equal(now) +} + // Broker is a message broker that supports operations to manage task queues. // // See rdb.RDB as a reference implementation. @@ -615,7 +683,7 @@ type Broker interface { Ping() error Enqueue(ctx context.Context, msg *TaskMessage) error EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error - Dequeue(qnames ...string) (*TaskMessage, error) + Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Done(msg *TaskMessage) error MarkAsComplete(msg *TaskMessage) error Requeue(msg *TaskMessage) error @@ -626,7 +694,7 @@ type Broker interface { ForwardIfReady(qnames ...string) error DeleteExpiredCompletedTasks(qname string) error ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error) - ExtendLease(qname string, ids ...string) error + ExtendLease(qname string, ids ...string) (time.Time, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 5f82f45..956cf9a 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -16,6 +16,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" + "github.com/hibiken/asynq/internal/timeutil" ) func TestTaskKey(t *testing.T) { @@ -644,3 +645,75 @@ func TestCancelationsConcurrentAccess(t *testing.T) { t.Errorf("(*Cancelations).Get(%q) = _, true, want , false", key2) } } + +func TestLeaseReset(t *testing.T) { + now := time.Now() + clock := timeutil.NewSimulatedClock(now) + + l := NewLease(now.Add(30 * time.Second)) + l.clock = clock + + // Check initial state + if !l.IsValid() { + t.Errorf("lease should be valid when expiration is set to a future time") + } + if want := now.Add(30 * time.Second); l.Deadline() != want { + t.Errorf("Lease.Deadline() = %v, want %v", l.Deadline(), want) + } + + // Test Reset + if !l.Reset(now.Add(45 * time.Second)) { + t.Fatalf("Lease.Reset returned false when extending") + } + if want := now.Add(45 * time.Second); l.Deadline() != want { + t.Errorf("After Reset: Lease.Deadline() = %v, want %v", l.Deadline(), want) + } + + clock.AdvanceTime(1 * time.Minute) // simulate lease expiration + + if l.IsValid() { + t.Errorf("lease should be invalid after expiration") + } + + // Reset should return false if lease is expired. + if l.Reset(time.Now().Add(20 * time.Second)) { + t.Errorf("Lease.Reset should return false after expiration") + } +} + +func TestLeaseNotifyExpiration(t *testing.T) { + now := time.Now() + clock := timeutil.NewSimulatedClock(now) + + l := NewLease(now.Add(30 * time.Second)) + l.clock = clock + + select { + case <-l.Done(): + t.Fatalf("Lease.Done() did not block") + default: + } + + if l.NotifyExpiration() { + t.Fatalf("Lease.NotifyExpiration() should return false when lease is still valid") + } + + clock.AdvanceTime(1 * time.Minute) // simulate lease expiration + + if l.IsValid() { + t.Errorf("Lease should be invalid after expiration") + } + if !l.NotifyExpiration() { + t.Errorf("Lease.NotifyExpiration() return return true after expiration") + } + if !l.NotifyExpiration() { + t.Errorf("It should be leagal to call Lease.NotifyExpiration multiple times") + } + + select { + case <-l.Done(): + // expected + default: + t.Errorf("Lease.Done() blocked after call to Lease.NotifyExpiration()") + } +}