mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			247 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			247 lines
		
	
	
		
			7.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 Kentaro Hibino. All rights reserved.
 | |
| // Use of this source code is governed by a MIT license
 | |
| // that can be found in the LICENSE file.
 | |
| 
 | |
| package asynq
 | |
| 
 | |
| import (
 | |
| 	"crypto/tls"
 | |
| 	"fmt"
 | |
| 	"net/url"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 
 | |
| 	"github.com/go-redis/redis/v7"
 | |
| )
 | |
| 
 | |
| // Task represents a unit of work to be performed.
 | |
| type Task struct {
 | |
| 	// Type indicates the type of task to be performed.
 | |
| 	Type string
 | |
| 
 | |
| 	// Payload holds data needed to perform the task.
 | |
| 	Payload Payload
 | |
| }
 | |
| 
 | |
| // NewTask returns a new Task given a type name and payload data.
 | |
| //
 | |
| // The payload values must be serializable.
 | |
| func NewTask(typename string, payload map[string]interface{}) *Task {
 | |
| 	return &Task{
 | |
| 		Type:    typename,
 | |
| 		Payload: Payload{payload},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
 | |
| //
 | |
| // RedisConnOpt represents a sum of following types:
 | |
| //
 | |
| //   - RedisClientOpt
 | |
| //   - RedisFailoverClientOpt
 | |
| //   - RedisClusterClientOpt
 | |
| type RedisConnOpt interface {
 | |
| 	// MakeRedisClient returns a new redis client instance.
 | |
| 	// Return value is intentionally opaque to hide the implementation detail of redis client.
 | |
| 	MakeRedisClient() interface{}
 | |
| }
 | |
| 
 | |
| // RedisClientOpt is used to create a redis client that connects
 | |
| // to a redis server directly.
 | |
| type RedisClientOpt struct {
 | |
| 	// Network type to use, either tcp or unix.
 | |
| 	// Default is tcp.
 | |
| 	Network string
 | |
| 
 | |
| 	// Redis server address in "host:port" format.
 | |
| 	Addr string
 | |
| 
 | |
| 	// Username to authenticate the current connection when Redis ACLs are used.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Username string
 | |
| 
 | |
| 	// Password to authenticate the current connection.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Password string
 | |
| 
 | |
| 	// Redis DB to select after connecting to a server.
 | |
| 	// See: https://redis.io/commands/select.
 | |
| 	DB int
 | |
| 
 | |
| 	// Maximum number of socket connections.
 | |
| 	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
 | |
| 	PoolSize int
 | |
| 
 | |
| 	// TLS Config used to connect to a server.
 | |
| 	// TLS will be negotiated only if this field is set.
 | |
| 	TLSConfig *tls.Config
 | |
| }
 | |
| 
 | |
| func (opt RedisClientOpt) MakeRedisClient() interface{} {
 | |
| 	return redis.NewClient(&redis.Options{
 | |
| 		Network:   opt.Network,
 | |
| 		Addr:      opt.Addr,
 | |
| 		Username:  opt.Username,
 | |
| 		Password:  opt.Password,
 | |
| 		DB:        opt.DB,
 | |
| 		PoolSize:  opt.PoolSize,
 | |
| 		TLSConfig: opt.TLSConfig,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // RedisFailoverClientOpt is used to creates a redis client that talks
 | |
| // to redis sentinels for service discovery and has an automatic failover
 | |
| // capability.
 | |
| type RedisFailoverClientOpt struct {
 | |
| 	// Redis master name that monitored by sentinels.
 | |
| 	MasterName string
 | |
| 
 | |
| 	// Addresses of sentinels in "host:port" format.
 | |
| 	// Use at least three sentinels to avoid problems described in
 | |
| 	// https://redis.io/topics/sentinel.
 | |
| 	SentinelAddrs []string
 | |
| 
 | |
| 	// Redis sentinel password.
 | |
| 	SentinelPassword string
 | |
| 
 | |
| 	// Username to authenticate the current connection when Redis ACLs are used.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Username string
 | |
| 
 | |
| 	// Password to authenticate the current connection.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Password string
 | |
| 
 | |
| 	// Redis DB to select after connecting to a server.
 | |
| 	// See: https://redis.io/commands/select.
 | |
| 	DB int
 | |
| 
 | |
| 	// Maximum number of socket connections.
 | |
| 	// Default is 10 connections per every CPU as reported by runtime.NumCPU.
 | |
| 	PoolSize int
 | |
| 
 | |
| 	// TLS Config used to connect to a server.
 | |
| 	// TLS will be negotiated only if this field is set.
 | |
| 	TLSConfig *tls.Config
 | |
| }
 | |
| 
 | |
| func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} {
 | |
| 	return redis.NewFailoverClient(&redis.FailoverOptions{
 | |
| 		MasterName:       opt.MasterName,
 | |
| 		SentinelAddrs:    opt.SentinelAddrs,
 | |
| 		SentinelPassword: opt.SentinelPassword,
 | |
| 		Username:         opt.Username,
 | |
| 		Password:         opt.Password,
 | |
| 		DB:               opt.DB,
 | |
| 		PoolSize:         opt.PoolSize,
 | |
| 		TLSConfig:        opt.TLSConfig,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // RedisClusterClientOpt is used to creates a redis client that connects to
 | |
| // redis cluster.
 | |
| type RedisClusterClientOpt struct {
 | |
| 	// A seed list of host:port addresses of cluster nodes.
 | |
| 	Addrs []string
 | |
| 
 | |
| 	// The maximum number of retries before giving up.
 | |
| 	// Command is retried on network errors and MOVED/ASK redirects.
 | |
| 	// Default is 8 retries.
 | |
| 	MaxRedirects int
 | |
| 
 | |
| 	// Username to authenticate the current connection when Redis ACLs are used.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Username string
 | |
| 
 | |
| 	// Password to authenticate the current connection.
 | |
| 	// See: https://redis.io/commands/auth.
 | |
| 	Password string
 | |
| 
 | |
| 	// TLS Config used to connect to a server.
 | |
| 	// TLS will be negotiated only if this field is set.
 | |
| 	TLSConfig *tls.Config
 | |
| }
 | |
| 
 | |
| func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
 | |
| 	return redis.NewClusterClient(&redis.ClusterOptions{
 | |
| 		Addrs:        opt.Addrs,
 | |
| 		MaxRedirects: opt.MaxRedirects,
 | |
| 		Username:     opt.Username,
 | |
| 		Password:     opt.Password,
 | |
| 		TLSConfig:    opt.TLSConfig,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
 | |
| // It returns a non-nil error if uri cannot be parsed.
 | |
| //
 | |
| // Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:.
 | |
| // Supported formats are:
 | |
| //     redis://[:password@]host[:port][/dbnumber]
 | |
| //     redis-socket://[:password@]path[?db=dbnumber]
 | |
| //     redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
 | |
| func ParseRedisURI(uri string) (RedisConnOpt, error) {
 | |
| 	u, err := url.Parse(uri)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
 | |
| 	}
 | |
| 	switch u.Scheme {
 | |
| 	case "redis":
 | |
| 		return parseRedisURI(u)
 | |
| 	case "redis-socket":
 | |
| 		return parseRedisSocketURI(u)
 | |
| 	case "redis-sentinel":
 | |
| 		return parseRedisSentinelURI(u)
 | |
| 	default:
 | |
| 		return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", u.Scheme)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func parseRedisURI(u *url.URL) (RedisConnOpt, error) {
 | |
| 	var db int
 | |
| 	var err error
 | |
| 	if len(u.Path) > 0 {
 | |
| 		xs := strings.Split(strings.Trim(u.Path, "/"), "/")
 | |
| 		db, err = strconv.Atoi(xs[0])
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path")
 | |
| 		}
 | |
| 	}
 | |
| 	var password string
 | |
| 	if v, ok := u.User.Password(); ok {
 | |
| 		password = v
 | |
| 	}
 | |
| 	return RedisClientOpt{Addr: u.Host, DB: db, Password: password}, nil
 | |
| }
 | |
| 
 | |
| func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
 | |
| 	const errPrefix = "asynq: could not parse redis socket uri"
 | |
| 	if len(u.Path) == 0 {
 | |
| 		return nil, fmt.Errorf("%s: path does not exist", errPrefix)
 | |
| 	}
 | |
| 	q := u.Query()
 | |
| 	var db int
 | |
| 	var err error
 | |
| 	if n := q.Get("db"); n != "" {
 | |
| 		db, err = strconv.Atoi(n)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("%s: query param `db` should be a number", errPrefix)
 | |
| 		}
 | |
| 	}
 | |
| 	var password string
 | |
| 	if v, ok := u.User.Password(); ok {
 | |
| 		password = v
 | |
| 	}
 | |
| 	return RedisClientOpt{Network: "unix", Addr: u.Path, DB: db, Password: password}, nil
 | |
| }
 | |
| 
 | |
| func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
 | |
| 	addrs := strings.Split(u.Host, ",")
 | |
| 	master := u.Query().Get("master")
 | |
| 	var password string
 | |
| 	if v, ok := u.User.Password(); ok {
 | |
| 		password = v
 | |
| 	}
 | |
| 	return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil
 | |
| }
 |