2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-19 23:30:20 +08:00

Merge 6021356fb5118c52576aa599dca9c02866d82f3c into c327bc40a28e4db45195cfe082d88faa808ce87d

This commit is contained in:
Ajat Prabha 2025-04-10 23:57:38 +08:00 committed by GitHub
commit 40c621090e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 0 deletions

View File

@ -104,6 +104,11 @@ func (s *Semaphore) Release(ctx context.Context) error {
return nil
}
// LockCount returns the number of currently acquired tokens.
func (s *Semaphore) LockCount(ctx context.Context) (int64, error) {
return s.rc.ZCard(ctx, semaphoreKey(s.scope)).Result()
}
// Close closes the connection to redis.
func (s *Semaphore) Close() error {
return s.rc.Close()

View File

@ -406,3 +406,46 @@ type badConnOpt struct {
func (b badConnOpt) MakeRedisClient() interface{} {
return nil
}
func TestSemaphore_LockCount(t *testing.T) {
desc := "ShouldReturnSetCardinalityAsTheLockCount"
sema := NewSemaphore(getRedisConnOpt(t), "task-9", 3)
defer sema.Close()
ctx, cancel := asynqcontext.New(&base.TaskMessage{
ID: "task-9-id",
Queue: "task-9",
}, time.Now().Add(time.Second))
defer cancel()
b, err := sema.Acquire(ctx)
if !b {
t.Errorf("%s;\nSemaphore.Acquire() failed, want true", desc)
}
if err != nil {
t.Errorf("%s;\nSemaphore.Acquire() got error %v want nil", desc, err)
}
n, err := sema.LockCount(ctx)
if err != nil {
t.Errorf("%s;\nSemaphore.LockCount() got error %v want nil", desc, err)
}
if n != 1 {
t.Errorf("%s;\nSemaphore.LockCount() got %d want 1", desc, n)
}
if err := sema.Release(ctx); err != nil {
t.Errorf("%s;\nSemaphore.Release() got error %v", desc, err)
}
n, err = sema.LockCount(ctx)
if err != nil {
t.Errorf("%s;\nSemaphore.LockCount() got error %v want nil", desc, err)
}
if n != 0 {
t.Errorf("%s;\nSemaphore.LockCount() got %d want 0", desc, n)
}
}