mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Allow upper case characters in queue name
This commit is contained in:
parent
c0ae62499f
commit
46b23d6495
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Changed `Queue` function to not to convert the provided queue name to lowercase. Queue names are now case-sensitive.
|
||||||
|
|
||||||
## [0.18.1] - 2020-07-04
|
## [0.18.1] - 2020-07-04
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
11
client.go
11
client.go
@ -6,7 +6,6 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -93,10 +92,8 @@ func (n retryOption) Type() OptionType { return MaxRetryOpt }
|
|||||||
func (n retryOption) Value() interface{} { return int(n) }
|
func (n retryOption) Value() interface{} { return int(n) }
|
||||||
|
|
||||||
// Queue returns an option to specify the queue to enqueue the task into.
|
// Queue returns an option to specify the queue to enqueue the task into.
|
||||||
//
|
|
||||||
// Queue name is case-insensitive and the lowercased version is used.
|
|
||||||
func Queue(qname string) Option {
|
func Queue(qname string) Option {
|
||||||
return queueOption(strings.ToLower(qname))
|
return queueOption(qname)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
|
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
|
||||||
@ -207,11 +204,11 @@ func composeOptions(opts ...Option) (option, error) {
|
|||||||
case retryOption:
|
case retryOption:
|
||||||
res.retry = int(opt)
|
res.retry = int(opt)
|
||||||
case queueOption:
|
case queueOption:
|
||||||
trimmed := strings.TrimSpace(string(opt))
|
qname := string(opt)
|
||||||
if err := base.ValidateQueueName(trimmed); err != nil {
|
if err := base.ValidateQueueName(qname); err != nil {
|
||||||
return option{}, err
|
return option{}, err
|
||||||
}
|
}
|
||||||
res.queue = trimmed
|
res.queue = qname
|
||||||
case timeoutOption:
|
case timeoutOption:
|
||||||
res.timeout = time.Duration(opt)
|
res.timeout = time.Duration(opt)
|
||||||
case deadlineOption:
|
case deadlineOption:
|
||||||
|
@ -287,13 +287,13 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "Queue option should be case-insensitive",
|
desc: "Queue option should be case sensitive",
|
||||||
task: task,
|
task: task,
|
||||||
opts: []Option{
|
opts: []Option{
|
||||||
Queue("HIGH"),
|
Queue("MyQueue"),
|
||||||
},
|
},
|
||||||
wantInfo: &TaskInfo{
|
wantInfo: &TaskInfo{
|
||||||
Queue: "high",
|
Queue: "MyQueue",
|
||||||
Type: task.Type(),
|
Type: task.Type(),
|
||||||
Payload: task.Payload(),
|
Payload: task.Payload(),
|
||||||
State: TaskStatePending,
|
State: TaskStatePending,
|
||||||
@ -306,12 +306,12 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
NextProcessAt: now,
|
NextProcessAt: now,
|
||||||
},
|
},
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"high": {
|
"MyQueue": {
|
||||||
{
|
{
|
||||||
Type: task.Type(),
|
Type: task.Type(),
|
||||||
Payload: task.Payload(),
|
Payload: task.Payload(),
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "high",
|
Queue: "MyQueue",
|
||||||
Timeout: int64(defaultTimeout.Seconds()),
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
Deadline: noDeadline.Unix(),
|
Deadline: noDeadline.Unix(),
|
||||||
},
|
},
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -85,7 +86,7 @@ func TaskStateFromString(s string) (TaskState, error) {
|
|||||||
// ValidateQueueName validates a given qname to be used as a queue name.
|
// ValidateQueueName validates a given qname to be used as a queue name.
|
||||||
// Returns nil if valid, otherwise returns non-nil error.
|
// Returns nil if valid, otherwise returns non-nil error.
|
||||||
func ValidateQueueName(qname string) error {
|
func ValidateQueueName(qname string) error {
|
||||||
if len(qname) == 0 {
|
if len(strings.TrimSpace(qname)) == 0 {
|
||||||
return fmt.Errorf("queue name must contain one or more characters")
|
return fmt.Errorf("queue name must contain one or more characters")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -295,6 +295,9 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
}
|
}
|
||||||
queues := make(map[string]int)
|
queues := make(map[string]int)
|
||||||
for qname, p := range cfg.Queues {
|
for qname, p := range cfg.Queues {
|
||||||
|
if err := base.ValidateQueueName(qname); err != nil {
|
||||||
|
continue // ignore invalid queue names
|
||||||
|
}
|
||||||
if p > 0 {
|
if p > 0 {
|
||||||
queues[qname] = p
|
queues[qname] = p
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user