mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
expose semaphore lock count
This commit is contained in:
parent
c08f142b56
commit
c8db042ff0
@ -104,6 +104,10 @@ func (s *Semaphore) Release(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Semaphore) LockCount(ctx context.Context) (int64, error) {
|
||||
return s.rc.ZCard(ctx, semaphoreKey(s.scope)).Result()
|
||||
}
|
||||
|
||||
// Close closes the connection to redis.
|
||||
func (s *Semaphore) Close() error {
|
||||
return s.rc.Close()
|
||||
|
@ -406,3 +406,46 @@ type badConnOpt struct {
|
||||
func (b badConnOpt) MakeRedisClient() interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSemaphore_LockCount(t *testing.T) {
|
||||
desc := "ShouldReturnSetCardinalityAsTheLockCount"
|
||||
|
||||
sema := NewSemaphore(getRedisConnOpt(t), "task-9", 3)
|
||||
defer sema.Close()
|
||||
|
||||
ctx, cancel := asynqcontext.New(&base.TaskMessage{
|
||||
ID: "task-9-id",
|
||||
Queue: "task-9",
|
||||
}, time.Now().Add(time.Second))
|
||||
defer cancel()
|
||||
|
||||
b, err := sema.Acquire(ctx)
|
||||
if !b {
|
||||
t.Errorf("%s;\nSemaphore.Acquire() failed, want true", desc)
|
||||
}
|
||||
if err != nil {
|
||||
t.Errorf("%s;\nSemaphore.Acquire() got error %v want nil", desc, err)
|
||||
}
|
||||
|
||||
n, err := sema.LockCount(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("%s;\nSemaphore.LockCount() got error %v want nil", desc, err)
|
||||
}
|
||||
|
||||
if n != 1 {
|
||||
t.Errorf("%s;\nSemaphore.LockCount() got %d want 1", desc, n)
|
||||
}
|
||||
|
||||
if err := sema.Release(ctx); err != nil {
|
||||
t.Errorf("%s;\nSemaphore.Release() got error %v", desc, err)
|
||||
}
|
||||
|
||||
n, err = sema.LockCount(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("%s;\nSemaphore.LockCount() got error %v want nil", desc, err)
|
||||
}
|
||||
|
||||
if n != 0 {
|
||||
t.Errorf("%s;\nSemaphore.LockCount() got %d want 0", desc, n)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user