diff --git a/CHANGELOG.md b/CHANGELOG.md index a3d73a4..185a62d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `ParseRedisURI` helper function is added to create a `RedisConnOpt` from a URI string. + ## [0.8.0] - 2020-04-19 ### Changed diff --git a/asynq.go b/asynq.go index 7d1b211..e2bfeec 100644 --- a/asynq.go +++ b/asynq.go @@ -7,6 +7,9 @@ package asynq import ( "crypto/tls" "fmt" + "net/url" + "strconv" + "strings" "github.com/go-redis/redis/v7" ) @@ -94,6 +97,79 @@ type RedisFailoverClientOpt struct { TLSConfig *tls.Config } +// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. +// It returns a non-nil error if uri cannot be parsed. +// +// Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:. +// Supported formats are: +// redis://[:password@]host[:port][/dbnumber] +// redis-socket://[:password@]path[?db=dbnumber] +// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] +func ParseRedisURI(uri string) (RedisConnOpt, error) { + u, err := url.Parse(uri) + if err != nil { + return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err) + } + switch u.Scheme { + case "redis": + return parseRedisURI(u) + case "redis-socket": + return parseRedisSocketURI(u) + case "redis-sentinel": + return parseRedisSentinelURI(u) + default: + return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", u.Scheme) + } +} + +func parseRedisURI(u *url.URL) (RedisConnOpt, error) { + var db int + var err error + if len(u.Path) > 0 { + xs := strings.Split(strings.Trim(u.Path, "/"), "/") + db, err = strconv.Atoi(xs[0]) + if err != nil { + return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path") + } + } + var password string + if v, ok := u.User.Password(); ok { + password = v + } + return RedisClientOpt{Addr: u.Host, DB: db, Password: password}, nil +} + +func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) { + const errPrefix = "asynq: could not parse redis socket uri" + if len(u.Path) == 0 { + return nil, fmt.Errorf("%s: path does not exist", errPrefix) + } + q := u.Query() + var db int + var err error + if n := q.Get("db"); n != "" { + db, err = strconv.Atoi(n) + if err != nil { + return nil, fmt.Errorf("%s: query param `db` should be a number", errPrefix) + } + } + var password string + if v, ok := u.User.Password(); ok { + password = v + } + return RedisClientOpt{Network: "unix", Addr: u.Path, DB: db, Password: password}, nil +} + +func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) { + addrs := strings.Split(u.Host, ",") + master := u.Query().Get("master") + var password string + if v, ok := u.User.Password(); ok { + password = v + } + return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil +} + // createRedisClient returns a redis client given a redis connection configuration. // // Passing an unexpected type as a RedisConnOpt argument will cause panic. diff --git a/asynq_test.go b/asynq_test.go index 3f01066..3f6675c 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -44,3 +44,106 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { }) return out }) + +func TestParseRedisURI(t *testing.T) { + tests := []struct { + uri string + want RedisConnOpt + }{ + { + "redis://localhost:6379", + RedisClientOpt{Addr: "localhost:6379"}, + }, + { + "redis://localhost:6379/3", + RedisClientOpt{Addr: "localhost:6379", DB: 3}, + }, + { + "redis://:mypassword@localhost:6379", + RedisClientOpt{Addr: "localhost:6379", Password: "mypassword"}, + }, + { + "redis://:mypassword@127.0.0.1:6379/11", + RedisClientOpt{Addr: "127.0.0.1:6379", Password: "mypassword", DB: 11}, + }, + { + "redis-socket:///var/run/redis/redis.sock", + RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock"}, + }, + { + "redis-socket://:mypassword@/var/run/redis/redis.sock", + RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", Password: "mypassword"}, + }, + { + "redis-socket:///var/run/redis/redis.sock?db=7", + RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", DB: 7}, + }, + { + "redis-socket://:mypassword@/var/run/redis/redis.sock?db=12", + RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", Password: "mypassword", DB: 12}, + }, + { + "redis-sentinel://localhost:5000,localhost:5001,localhost:5002?master=mymaster", + RedisFailoverClientOpt{ + MasterName: "mymaster", + SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"}, + }, + }, + { + "redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002?master=mymaster", + RedisFailoverClientOpt{ + MasterName: "mymaster", + SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"}, + Password: "mypassword", + }, + }, + } + + for _, tc := range tests { + got, err := ParseRedisURI(tc.uri) + if err != nil { + t.Errorf("ParseRedisURI(%q) returned an error: %v", tc.uri, err) + continue + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("ParseRedisURI(%q) = %+v, want %+v\n(-want,+got)\n%s", tc.uri, got, tc.want, diff) + } + } +} + +func TestParseRedisURIErrors(t *testing.T) { + tests := []struct { + desc string + uri string + }{ + { + "unsupported scheme", + "rdb://localhost:6379", + }, + { + "missing scheme", + "localhost:6379", + }, + { + "multiple db numbers", + "redis://localhost:6379/1,2,3", + }, + { + "missing path for socket connection", + "redis-socket://?db=one", + }, + { + "non integer for db numbers for socket", + "redis-socket:///some/path/to/redis?db=one", + }, + } + + for _, tc := range tests { + _, err := ParseRedisURI(tc.uri) + if err == nil { + t.Errorf("%s: ParseRedisURI(%q) succeeded for malformed input, want error", + tc.desc, tc.uri) + } + } +} diff --git a/example_test.go b/example_test.go index 10bf93c..5921ef8 100644 --- a/example_test.go +++ b/example_test.go @@ -5,6 +5,7 @@ package asynq_test import ( + "fmt" "log" "os" "os/signal" @@ -76,3 +77,19 @@ func ExampleServer_Quiet() { srv.Stop() } + +func ExampleParseRedisURI() { + rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10") + if err != nil { + log.Fatal(err) + } + r, ok := rconn.(asynq.RedisClientOpt) + if !ok { + log.Fatal("unexpected type") + } + fmt.Println(r.Addr) + fmt.Println(r.DB) + // Output: + // localhost:6379 + // 10 +}