// Copyright 2020 Kentaro Hibino. All rights reserved. // Use of this source code is governed by a MIT license // that can be found in the LICENSE file. package asynq import ( "context" "crypto/tls" "fmt" "net" "net/url" "strconv" "strings" "sync" "time" "github.com/hibiken/asynq/internal/base" "github.com/redis/go-redis/v9" ) // Task represents a unit of work to be performed. type Task struct { // typename indicates the type of task to be performed. typename string // payload holds data needed to perform the task. payload []byte // opts holds options for the task. opts []Option // w is the ResultWriter for the task. w *ResultWriter } func (t *Task) Type() string { return t.typename } func (t *Task) Payload() []byte { return t.payload } // ResultWriter returns a pointer to the ResultWriter associated with the task. // // Nil pointer is returned if called on a newly created task (i.e. task created by calling NewTask). // Only the tasks passed to Handler.ProcessTask have a valid ResultWriter pointer. func (t *Task) ResultWriter() *ResultWriter { return t.w } // NewTask returns a new Task given a type name and payload data. // Options can be passed to configure task processing behavior. func NewTask(typename string, payload []byte, opts ...Option) *Task { return &Task{ typename: typename, payload: payload, opts: opts, } } // newTask creates a task with the given typename, payload and ResultWriter. func newTask(typename string, payload []byte, w *ResultWriter) *Task { return &Task{ typename: typename, payload: payload, w: w, } } // A TaskInfo describes a task and its metadata. type TaskInfo struct { // ID is the identifier of the task. ID string // Queue is the name of the queue in which the task belongs. Queue string // Type is the type name of the task. Type string // Payload is the payload data of the task. Payload []byte // State indicates the task state. State TaskState // MaxRetry is the maximum number of times the task can be retried. MaxRetry int // Retried is the number of times the task has retried so far. Retried int // LastErr is the error message from the last failure. LastErr string // LastFailedAt is the time time of the last failure if any. // If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}). LastFailedAt time.Time // Timeout is the duration the task can be processed by Handler before being retried, // zero if not specified Timeout time.Duration // Deadline is the deadline for the task, zero value if not specified. Deadline time.Time // Group is the name of the group in which the task belongs. // // Tasks in the same queue can be grouped together by Group name and will be aggregated into one task // by a Server processing the queue. // // Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task. Group string // NextProcessAt is the time the task is scheduled to be processed, // zero if not applicable. NextProcessAt time.Time // IsOrphaned describes whether the task is left in active state with no worker processing it. // An orphaned task indicates that the worker has crashed or experienced network failures and was not able to // extend its lease on the task. // // This task will be recovered by running a server against the queue the task is in. // This field is only applicable to tasks with TaskStateActive. IsOrphaned bool // Retention is duration of the retention period after the task is successfully processed. Retention time.Duration // CompletedAt is the time when the task is processed successfully. // Zero value (i.e. time.Time{}) indicates no value. CompletedAt time.Time // Result holds the result data associated with the task. // Use ResultWriter to write result data from the Handler. Result []byte } // If t is non-zero, returns time converted from t as unix time in seconds. // If t is zero, returns zero value of time.Time. func fromUnixTimeOrZero(t int64) time.Time { if t == 0 { return time.Time{} } return time.Unix(t, 0) } func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time, result []byte) *TaskInfo { info := TaskInfo{ ID: msg.ID, Queue: msg.Queue, Type: msg.Type, Payload: msg.Payload, // Do we need to make a copy? MaxRetry: msg.Retry, Retried: msg.Retried, LastErr: msg.ErrorMsg, Group: msg.GroupKey, Timeout: time.Duration(msg.Timeout) * time.Second, Deadline: fromUnixTimeOrZero(msg.Deadline), Retention: time.Duration(msg.Retention) * time.Second, NextProcessAt: nextProcessAt, LastFailedAt: fromUnixTimeOrZero(msg.LastFailedAt), CompletedAt: fromUnixTimeOrZero(msg.CompletedAt), Result: result, } switch state { case base.TaskStateActive: info.State = TaskStateActive case base.TaskStatePending: info.State = TaskStatePending case base.TaskStateScheduled: info.State = TaskStateScheduled case base.TaskStateRetry: info.State = TaskStateRetry case base.TaskStateArchived: info.State = TaskStateArchived case base.TaskStateCompleted: info.State = TaskStateCompleted case base.TaskStateAggregating: info.State = TaskStateAggregating default: panic(fmt.Sprintf("internal error: unknown state: %d", state)) } return &info } // TaskState denotes the state of a task. type TaskState int const ( // Indicates that the task is currently being processed by Handler. TaskStateActive TaskState = iota + 1 // Indicates that the task is ready to be processed by Handler. TaskStatePending // Indicates that the task is scheduled to be processed some time in the future. TaskStateScheduled // Indicates that the task has previously failed and scheduled to be processed some time in the future. TaskStateRetry // Indicates that the task is archived and stored for inspection purposes. TaskStateArchived // Indicates that the task is processed successfully and retained until the retention TTL expires. TaskStateCompleted // Indicates that the task is waiting in a group to be aggregated into one task. TaskStateAggregating ) func (s TaskState) String() string { switch s { case TaskStateActive: return "active" case TaskStatePending: return "pending" case TaskStateScheduled: return "scheduled" case TaskStateRetry: return "retry" case TaskStateArchived: return "archived" case TaskStateCompleted: return "completed" case TaskStateAggregating: return "aggregating" } panic("asynq: unknown task state") } // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // // RedisConnOpt represents a sum of following types: // // - RedisClientOpt // - RedisFailoverClientOpt // - RedisClusterClientOpt type RedisConnOpt interface { // MakeRedisClient returns a new redis client instance. // Return value is intentionally opaque to hide the implementation detail of redis client. MakeRedisClient() interface{} } // RedisClientOpt is used to create a redis client that connects // to a redis server directly. type RedisClientOpt struct { // Network type to use, either tcp or unix. // Default is tcp. Network string // Redis server address in "host:port" format. Addr string // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimout. WriteTimeout time.Duration // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config } func (opt RedisClientOpt) MakeRedisClient() interface{} { return redis.NewClient(&redis.Options{ Network: opt.Network, Addr: opt.Addr, Username: opt.Username, Password: opt.Password, DB: opt.DB, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, TLSConfig: opt.TLSConfig, }) } // RedisFailoverClientOpt is used to creates a redis client that talks // to redis sentinels for service discovery and has an automatic failover // capability. type RedisFailoverClientOpt struct { // Redis master name that monitored by sentinels. MasterName string // Addresses of sentinels in "host:port" format. // Use at least three sentinels to avoid problems described in // https://redis.io/topics/sentinel. SentinelAddrs []string // Redis sentinel password. SentinelPassword string // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Redis DB to select after connecting to a server. // See: https://redis.io/commands/select. DB int // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimeout WriteTimeout time.Duration // Maximum number of socket connections. // Default is 10 connections per every CPU as reported by runtime.NumCPU. PoolSize int // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config } func (opt RedisFailoverClientOpt) MakeRedisClient() interface{} { return redis.NewFailoverClient(&redis.FailoverOptions{ MasterName: opt.MasterName, SentinelAddrs: opt.SentinelAddrs, SentinelPassword: opt.SentinelPassword, Username: opt.Username, Password: opt.Password, DB: opt.DB, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, PoolSize: opt.PoolSize, TLSConfig: opt.TLSConfig, }) } // RedisClusterClientOpt is used to creates a redis client that connects to // redis cluster. type RedisClusterClientOpt struct { // A seed list of host:port addresses of cluster nodes. Addrs []string // The maximum number of retries before giving up. // Command is retried on network errors and MOVED/ASK redirects. // Default is 8 retries. MaxRedirects int // Username to authenticate the current connection when Redis ACLs are used. // See: https://redis.io/commands/auth. Username string // Password to authenticate the current connection. // See: https://redis.io/commands/auth. Password string // Dial timeout for establishing new connections. // Default is 5 seconds. DialTimeout time.Duration // Timeout for socket reads. // If timeout is reached, read commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is 3 seconds. ReadTimeout time.Duration // Timeout for socket writes. // If timeout is reached, write commands will fail with a timeout error // instead of blocking. // // Use value -1 for no timeout and 0 for default. // Default is ReadTimeout. WriteTimeout time.Duration // TLS Config used to connect to a server. // TLS will be negotiated only if this field is set. TLSConfig *tls.Config } func (opt RedisClusterClientOpt) MakeRedisClient() interface{} { return redis.NewClusterClient(&redis.ClusterOptions{ Addrs: opt.Addrs, MaxRedirects: opt.MaxRedirects, Username: opt.Username, Password: opt.Password, DialTimeout: opt.DialTimeout, ReadTimeout: opt.ReadTimeout, WriteTimeout: opt.WriteTimeout, TLSConfig: opt.TLSConfig, }) } // ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid. // It returns a non-nil error if uri cannot be parsed. // // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Supported formats are: // // redis://[:password@]host[:port][/dbnumber] // rediss://[:password@]host[:port][/dbnumber] // redis-socket://[:password@]path[?db=dbnumber] // redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] func ParseRedisURI(uri string) (RedisConnOpt, error) { u, err := url.Parse(uri) if err != nil { return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err) } switch u.Scheme { case "redis", "rediss": return parseRedisURI(u) case "redis-socket": return parseRedisSocketURI(u) case "redis-sentinel": return parseRedisSentinelURI(u) default: return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", u.Scheme) } } func parseRedisURI(u *url.URL) (RedisConnOpt, error) { var db int var err error var redisConnOpt RedisClientOpt if len(u.Path) > 0 { xs := strings.Split(strings.Trim(u.Path, "/"), "/") db, err = strconv.Atoi(xs[0]) if err != nil { return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path") } } var password string if v, ok := u.User.Password(); ok { password = v } if u.Scheme == "rediss" { h, _, err := net.SplitHostPort(u.Host) if err != nil { h = u.Host } redisConnOpt.TLSConfig = &tls.Config{ServerName: h} } redisConnOpt.Addr = u.Host redisConnOpt.Password = password redisConnOpt.DB = db return redisConnOpt, nil } func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) { const errPrefix = "asynq: could not parse redis socket uri" if len(u.Path) == 0 { return nil, fmt.Errorf("%s: path does not exist", errPrefix) } q := u.Query() var db int var err error if n := q.Get("db"); n != "" { db, err = strconv.Atoi(n) if err != nil { return nil, fmt.Errorf("%s: query param `db` should be a number", errPrefix) } } var password string if v, ok := u.User.Password(); ok { password = v } return RedisClientOpt{Network: "unix", Addr: u.Path, DB: db, Password: password}, nil } func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) { addrs := strings.Split(u.Host, ",") master := u.Query().Get("master") var password string if v, ok := u.User.Password(); ok { password = v } return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, SentinelPassword: password}, nil } // ResultWriter is a client interface to write result data for a task. // It writes the data to the redis instance the server is connected to. type ResultWriter struct { id string // task ID this writer is responsible for qname string // queue name the task belongs to broker base.Broker ctx context.Context // context associated with the task } // Write writes the given data as a result of the task the ResultWriter is associated with. func (w *ResultWriter) Write(data []byte) (n int, err error) { select { case <-w.ctx.Done(): return 0, fmt.Errorf("failed to result task result: %v", w.ctx.Err()) default: } return w.broker.WriteResult(w.qname, w.id, data) } // TaskID returns the ID of the task the ResultWriter is associated with. func (w *ResultWriter) TaskID() string { return w.id } var globalPrefixOnce sync.Once // SetGlobalPrefix sets the global prefix for all redis keys used by asynq. func SetGlobalPrefix(prefix string) { globalPrefixOnce.Do(func() { base.GlobalPrefix = prefix }) }