mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add rediss url parsing support
This commit is contained in:
parent
c04fd41653
commit
04d7c8c38c
10
CHANGELOG.md
10
CHANGELOG.md
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Enable rediss url parsing support
|
||||||
|
|
||||||
## [0.22.1] - 2022-02-20
|
## [0.22.1] - 2022-02-20
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
@ -264,9 +268,9 @@ Use `ProcessIn` or `ProcessAt` option to schedule a task instead of `EnqueueIn`
|
|||||||
|
|
||||||
#### `Inspector`
|
#### `Inspector`
|
||||||
|
|
||||||
All Inspector methods are scoped to a queue, and the methods take `qname (string)` as the first argument.
|
All Inspector methods are scoped to a queue, and the methods take `qname (string)` as the first argument.
|
||||||
`EnqueuedTask` is renamed to `PendingTask` and its corresponding methods.
|
`EnqueuedTask` is renamed to `PendingTask` and its corresponding methods.
|
||||||
`InProgressTask` is renamed to `ActiveTask` and its corresponding methods.
|
`InProgressTask` is renamed to `ActiveTask` and its corresponding methods.
|
||||||
Command "Enqueue" is replaced by the verb "Run" (e.g. `EnqueueAllScheduledTasks` --> `RunAllScheduledTasks`)
|
Command "Enqueue" is replaced by the verb "Run" (e.g. `EnqueueAllScheduledTasks` --> `RunAllScheduledTasks`)
|
||||||
|
|
||||||
#### `CLI`
|
#### `CLI`
|
||||||
|
23
asynq.go
23
asynq.go
@ -8,6 +8,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@ -419,9 +420,10 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
|
|||||||
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
|
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
|
||||||
// It returns a non-nil error if uri cannot be parsed.
|
// It returns a non-nil error if uri cannot be parsed.
|
||||||
//
|
//
|
||||||
// Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:.
|
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
|
||||||
// Supported formats are:
|
// Supported formats are:
|
||||||
// redis://[:password@]host[:port][/dbnumber]
|
// redis://[:password@]host[:port][/dbnumber]
|
||||||
|
// rediss://[:password@]host[:port][/dbnumber]
|
||||||
// redis-socket://[:password@]path[?db=dbnumber]
|
// redis-socket://[:password@]path[?db=dbnumber]
|
||||||
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
|
||||||
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
||||||
@ -430,7 +432,7 @@ func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
|||||||
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
|
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
|
||||||
}
|
}
|
||||||
switch u.Scheme {
|
switch u.Scheme {
|
||||||
case "redis":
|
case "redis", "rediss":
|
||||||
return parseRedisURI(u)
|
return parseRedisURI(u)
|
||||||
case "redis-socket":
|
case "redis-socket":
|
||||||
return parseRedisSocketURI(u)
|
return parseRedisSocketURI(u)
|
||||||
@ -444,6 +446,8 @@ func ParseRedisURI(uri string) (RedisConnOpt, error) {
|
|||||||
func parseRedisURI(u *url.URL) (RedisConnOpt, error) {
|
func parseRedisURI(u *url.URL) (RedisConnOpt, error) {
|
||||||
var db int
|
var db int
|
||||||
var err error
|
var err error
|
||||||
|
var redisConnOpt RedisClientOpt
|
||||||
|
|
||||||
if len(u.Path) > 0 {
|
if len(u.Path) > 0 {
|
||||||
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
|
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
|
||||||
db, err = strconv.Atoi(xs[0])
|
db, err = strconv.Atoi(xs[0])
|
||||||
@ -455,7 +459,20 @@ func parseRedisURI(u *url.URL) (RedisConnOpt, error) {
|
|||||||
if v, ok := u.User.Password(); ok {
|
if v, ok := u.User.Password(); ok {
|
||||||
password = v
|
password = v
|
||||||
}
|
}
|
||||||
return RedisClientOpt{Addr: u.Host, DB: db, Password: password}, nil
|
|
||||||
|
if u.Scheme == "rediss" {
|
||||||
|
h, _, err := net.SplitHostPort(u.Host)
|
||||||
|
if err != nil {
|
||||||
|
h = u.Host
|
||||||
|
}
|
||||||
|
redisConnOpt.TLSConfig = &tls.Config{ServerName: h}
|
||||||
|
}
|
||||||
|
|
||||||
|
redisConnOpt.Addr = u.Host
|
||||||
|
redisConnOpt.Password = password
|
||||||
|
redisConnOpt.DB = db
|
||||||
|
|
||||||
|
return redisConnOpt, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
|
func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
package asynq
|
package asynq
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"flag"
|
"flag"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -12,6 +13,7 @@ import (
|
|||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||||
"github.com/hibiken/asynq/internal/log"
|
"github.com/hibiken/asynq/internal/log"
|
||||||
)
|
)
|
||||||
@ -99,6 +101,10 @@ func TestParseRedisURI(t *testing.T) {
|
|||||||
"redis://localhost:6379",
|
"redis://localhost:6379",
|
||||||
RedisClientOpt{Addr: "localhost:6379"},
|
RedisClientOpt{Addr: "localhost:6379"},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"rediss://localhost:6379",
|
||||||
|
RedisClientOpt{Addr: "localhost:6379", TLSConfig: &tls.Config{ServerName: "localhost"}},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"redis://localhost:6379/3",
|
"redis://localhost:6379/3",
|
||||||
RedisClientOpt{Addr: "localhost:6379", DB: 3},
|
RedisClientOpt{Addr: "localhost:6379", DB: 3},
|
||||||
@ -151,7 +157,7 @@ func TestParseRedisURI(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(tc.want, got); diff != "" {
|
if diff := cmp.Diff(tc.want, got, cmpopts.IgnoreUnexported(tls.Config{})); diff != "" {
|
||||||
t.Errorf("ParseRedisURI(%q) = %+v, want %+v\n(-want,+got)\n%s", tc.uri, got, tc.want, diff)
|
t.Errorf("ParseRedisURI(%q) = %+v, want %+v\n(-want,+got)\n%s", tc.uri, got, tc.want, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user