2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

rename vars and update docs

This commit is contained in:
ajatprabha 2021-10-31 21:40:26 +05:30
parent ee6529191f
commit 4bbbdb30cf
No known key found for this signature in database
GPG Key ID: EEA3FDB0312545DA
2 changed files with 36 additions and 37 deletions

View File

@ -11,33 +11,33 @@ import (
asynqcontext "github.com/hibiken/asynq/internal/context" asynqcontext "github.com/hibiken/asynq/internal/context"
) )
// NewSemaphore creates a new counting Semaphore. // NewSemaphore creates a counting Semaphore for the given scope with the given number of tokens.
func NewSemaphore(rco asynq.RedisConnOpt, name string, maxConcurrency int) *Semaphore { func NewSemaphore(rco asynq.RedisConnOpt, scope string, maxTokens int) *Semaphore {
rc, ok := rco.MakeRedisClient().(redis.UniversalClient) rc, ok := rco.MakeRedisClient().(redis.UniversalClient)
if !ok { if !ok {
panic(fmt.Sprintf("rate.NewSemaphore: unsupported RedisConnOpt type %T", rco)) panic(fmt.Sprintf("rate.NewSemaphore: unsupported RedisConnOpt type %T", rco))
} }
if maxConcurrency < 1 { if maxTokens < 1 {
panic("rate.NewSemaphore: maxConcurrency cannot be less than 1") panic("rate.NewSemaphore: maxTokens cannot be less than 1")
} }
if len(strings.TrimSpace(name)) == 0 { if len(strings.TrimSpace(scope)) == 0 {
panic("rate.NewSemaphore: name should not be empty") panic("rate.NewSemaphore: scope should not be empty")
} }
return &Semaphore{ return &Semaphore{
rc: rc, rc: rc,
name: name, scope: scope,
maxConcurrency: maxConcurrency, maxTokens: maxTokens,
} }
} }
// Semaphore is a distributed counting semaphore which can be used to set maxConcurrency across multiple asynq servers. // Semaphore is a distributed counting semaphore which can be used to set maxTokens across multiple asynq servers.
type Semaphore struct { type Semaphore struct {
rc redis.UniversalClient rc redis.UniversalClient
maxConcurrency int maxTokens int
name string scope string
} }
// KEYS[1] -> asynq:sema:<scope> // KEYS[1] -> asynq:sema:<scope>
@ -47,9 +47,9 @@ type Semaphore struct {
// ARGV[4] -> task ID // ARGV[4] -> task ID
var acquireCmd = redis.NewScript(` var acquireCmd = redis.NewScript(`
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", tonumber(ARGV[2])-1) redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", tonumber(ARGV[2])-1)
local lockCount = redis.call("ZCARD", KEYS[1]) local count = redis.call("ZCARD", KEYS[1])
if (lockCount < tonumber(ARGV[1])) then if (count < tonumber(ARGV[1])) then
redis.call("ZADD", KEYS[1], ARGV[3], ARGV[4]) redis.call("ZADD", KEYS[1], ARGV[3], ARGV[4])
return true return true
else else
@ -57,13 +57,13 @@ else
end end
`) `)
// Acquire will try to acquire lock on the counting semaphore. // Acquire attempts to acquire a token from the semaphore.
// - Returns (true, nil), iff semaphore key exists and current value is less than maxConcurrency // - Returns (true, nil), iff semaphore key exists and current value is less than maxTokens
// - Returns (false, nil) when lock cannot be acquired // - Returns (false, nil) when token cannot be acquired
// - Returns (false, error) otherwise // - Returns (false, error) otherwise
// //
// The context.Context passed to Acquire must have a deadline set, // The context.Context passed to Acquire must have a deadline set,
// this ensures that lock is released if the job goroutine crashes and does not call Release. // this ensures that token is released if the job goroutine crashes and does not call Release.
func (s *Semaphore) Acquire(ctx context.Context) (bool, error) { func (s *Semaphore) Acquire(ctx context.Context) (bool, error) {
d, ok := ctx.Deadline() d, ok := ctx.Deadline()
if !ok { if !ok {
@ -76,13 +76,12 @@ func (s *Semaphore) Acquire(ctx context.Context) (bool, error) {
} }
b, err := acquireCmd.Run(ctx, s.rc, b, err := acquireCmd.Run(ctx, s.rc,
[]string{semaphoreKey(s.name)}, []string{semaphoreKey(s.scope)},
[]interface{}{ s.maxTokens,
s.maxConcurrency,
time.Now().Unix(), time.Now().Unix(),
d.Unix(), d.Unix(),
taskID, taskID,
}...).Bool() ).Bool()
if err == redis.Nil { if err == redis.Nil {
return b, nil return b, nil
} }
@ -90,26 +89,26 @@ func (s *Semaphore) Acquire(ctx context.Context) (bool, error) {
return b, err return b, err
} }
// Release will release the lock on the counting semaphore. // Release will release the token on the counting semaphore.
func (s *Semaphore) Release(ctx context.Context) error { func (s *Semaphore) Release(ctx context.Context) error {
taskID, ok := asynqcontext.GetTaskID(ctx) taskID, ok := asynqcontext.GetTaskID(ctx)
if !ok { if !ok {
return fmt.Errorf("provided context is missing task ID value") return fmt.Errorf("provided context is missing task ID value")
} }
n, err := s.rc.ZRem(ctx, semaphoreKey(s.name), taskID).Result() n, err := s.rc.ZRem(ctx, semaphoreKey(s.scope), taskID).Result()
if err != nil { if err != nil {
return fmt.Errorf("redis command failed: %v", err) return fmt.Errorf("redis command failed: %v", err)
} }
if n == 0 { if n == 0 {
return fmt.Errorf("no lock found for task %q", taskID) return fmt.Errorf("no token found for task %q", taskID)
} }
return nil return nil
} }
// Close closes the connection with redis. // Close closes the connection to redis.
func (s *Semaphore) Close() error { func (s *Semaphore) Close() error {
return s.rc.Close() return s.rc.Close()
} }

View File

@ -43,14 +43,14 @@ func TestNewSemaphore(t *testing.T) {
connOpt: &badConnOpt{}, connOpt: &badConnOpt{},
}, },
{ {
desc: "Zero maxConcurrency should panic", desc: "Zero maxTokens should panic",
wantPanic: "rate.NewSemaphore: maxConcurrency cannot be less than 1", wantPanic: "rate.NewSemaphore: maxTokens cannot be less than 1",
}, },
{ {
desc: "Empty name should panic", desc: "Empty scope should panic",
maxConcurrency: 2, maxConcurrency: 2,
name: " ", name: " ",
wantPanic: "rate.NewSemaphore: name should not be empty", wantPanic: "rate.NewSemaphore: scope should not be empty",
}, },
} }
@ -86,7 +86,7 @@ func TestNewSemaphore_Acquire(t *testing.T) {
wantErr string wantErr string
}{ }{
{ {
desc: "Should acquire lock when current lock count is less than maxConcurrency", desc: "Should acquire lock when current lock count is less than maxTokens",
name: "task-1", name: "task-1",
maxConcurrency: 3, maxConcurrency: 3,
taskIDs: []uuid.UUID{uuid.New(), uuid.New()}, taskIDs: []uuid.UUID{uuid.New(), uuid.New()},
@ -99,7 +99,7 @@ func TestNewSemaphore_Acquire(t *testing.T) {
want: []bool{true, true}, want: []bool{true, true},
}, },
{ {
desc: "Should fail acquiring lock when current lock count is equal to maxConcurrency", desc: "Should fail acquiring lock when current lock count is equal to maxTokens",
name: "task-2", name: "task-2",
maxConcurrency: 3, maxConcurrency: 3,
taskIDs: []uuid.UUID{uuid.New(), uuid.New(), uuid.New(), uuid.New()}, taskIDs: []uuid.UUID{uuid.New(), uuid.New(), uuid.New(), uuid.New()},
@ -250,7 +250,7 @@ func TestNewSemaphore_Release(t *testing.T) {
}, time.Now().Add(time.Second)) }, time.Now().Add(time.Second))
}, },
wantCount: 1, wantCount: 1,
wantErr: fmt.Sprintf("no lock found for task %q", testID.String()), wantErr: fmt.Sprintf("no token found for task %q", testID.String()),
}, },
} }