mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			428 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			428 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 Kentaro Hibino. All rights reserved.
 | |
| // Use of this source code is governed by a MIT license
 | |
| // that can be found in the LICENSE file.
 | |
| 
 | |
| package asynq
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-redis/redis/v8"
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/hibiken/asynq/internal/base"
 | |
| 	"github.com/hibiken/asynq/internal/errors"
 | |
| 	"github.com/hibiken/asynq/internal/rdb"
 | |
| )
 | |
| 
 | |
| // A Client is responsible for scheduling tasks.
 | |
| //
 | |
| // A Client is used to register tasks that should be processed
 | |
| // immediately or some time in the future.
 | |
| //
 | |
| // Clients are safe for concurrent use by multiple goroutines.
 | |
| type Client struct {
 | |
| 	broker base.Broker
 | |
| }
 | |
| 
 | |
| // NewClient returns a new Client instance given a redis connection option.
 | |
| func NewClient(r RedisConnOpt) *Client {
 | |
| 	c, ok := r.MakeRedisClient().(redis.UniversalClient)
 | |
| 	if !ok {
 | |
| 		panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
 | |
| 	}
 | |
| 	return &Client{broker: rdb.NewRDB(c)}
 | |
| }
 | |
| 
 | |
| type OptionType int
 | |
| 
 | |
| const (
 | |
| 	MaxRetryOpt OptionType = iota
 | |
| 	QueueOpt
 | |
| 	TimeoutOpt
 | |
| 	DeadlineOpt
 | |
| 	UniqueOpt
 | |
| 	ProcessAtOpt
 | |
| 	ProcessInOpt
 | |
| 	TaskIDOpt
 | |
| 	RetentionOpt
 | |
| 	GroupOpt
 | |
| )
 | |
| 
 | |
| // Option specifies the task processing behavior.
 | |
| type Option interface {
 | |
| 	// String returns a string representation of the option.
 | |
| 	String() string
 | |
| 
 | |
| 	// Type describes the type of the option.
 | |
| 	Type() OptionType
 | |
| 
 | |
| 	// Value returns a value used to create this option.
 | |
| 	Value() interface{}
 | |
| }
 | |
| 
 | |
| // Internal option representations.
 | |
| type (
 | |
| 	retryOption     int
 | |
| 	queueOption     string
 | |
| 	taskIDOption    string
 | |
| 	timeoutOption   time.Duration
 | |
| 	deadlineOption  time.Time
 | |
| 	uniqueOption    time.Duration
 | |
| 	processAtOption time.Time
 | |
| 	processInOption time.Duration
 | |
| 	retentionOption time.Duration
 | |
| 	groupOption     string
 | |
| )
 | |
| 
 | |
| // MaxRetry returns an option to specify the max number of times
 | |
| // the task will be retried.
 | |
| //
 | |
| // Negative retry count is treated as zero retry.
 | |
| func MaxRetry(n int) Option {
 | |
| 	if n < 0 {
 | |
| 		n = 0
 | |
| 	}
 | |
| 	return retryOption(n)
 | |
| }
 | |
| 
 | |
| func (n retryOption) String() string     { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
 | |
| func (n retryOption) Type() OptionType   { return MaxRetryOpt }
 | |
| func (n retryOption) Value() interface{} { return int(n) }
 | |
| 
 | |
| // Queue returns an option to specify the queue to enqueue the task into.
 | |
| func Queue(qname string) Option {
 | |
| 	return queueOption(qname)
 | |
| }
 | |
| 
 | |
| func (qname queueOption) String() string     { return fmt.Sprintf("Queue(%q)", string(qname)) }
 | |
| func (qname queueOption) Type() OptionType   { return QueueOpt }
 | |
| func (qname queueOption) Value() interface{} { return string(qname) }
 | |
| 
 | |
| // TaskID returns an option to specify the task ID.
 | |
| func TaskID(id string) Option {
 | |
| 	return taskIDOption(id)
 | |
| }
 | |
| 
 | |
| func (id taskIDOption) String() string     { return fmt.Sprintf("TaskID(%q)", string(id)) }
 | |
| func (id taskIDOption) Type() OptionType   { return TaskIDOpt }
 | |
| func (id taskIDOption) Value() interface{} { return string(id) }
 | |
| 
 | |
| // Timeout returns an option to specify how long a task may run.
 | |
| // If the timeout elapses before the Handler returns, then the task
 | |
| // will be retried.
 | |
| //
 | |
| // Zero duration means no limit.
 | |
| //
 | |
| // If there's a conflicting Deadline option, whichever comes earliest
 | |
| // will be used.
 | |
| func Timeout(d time.Duration) Option {
 | |
| 	return timeoutOption(d)
 | |
| }
 | |
| 
 | |
| func (d timeoutOption) String() string     { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
 | |
| func (d timeoutOption) Type() OptionType   { return TimeoutOpt }
 | |
| func (d timeoutOption) Value() interface{} { return time.Duration(d) }
 | |
| 
 | |
| // Deadline returns an option to specify the deadline for the given task.
 | |
| // If it reaches the deadline before the Handler returns, then the task
 | |
| // will be retried.
 | |
| //
 | |
| // If there's a conflicting Timeout option, whichever comes earliest
 | |
| // will be used.
 | |
| func Deadline(t time.Time) Option {
 | |
| 	return deadlineOption(t)
 | |
| }
 | |
| 
 | |
| func (t deadlineOption) String() string {
 | |
| 	return fmt.Sprintf("Deadline(%v)", time.Time(t).Format(time.UnixDate))
 | |
| }
 | |
| func (t deadlineOption) Type() OptionType   { return DeadlineOpt }
 | |
| func (t deadlineOption) Value() interface{} { return time.Time(t) }
 | |
| 
 | |
| // Unique returns an option to enqueue a task only if the given task is unique.
 | |
| // Task enqueued with this option is guaranteed to be unique within the given ttl.
 | |
| // Once the task gets processed successfully or once the TTL has expired,
 | |
| // another task with the same uniqueness may be enqueued.
 | |
| // ErrDuplicateTask error is returned when enqueueing a duplicate task.
 | |
| // TTL duration must be greater than or equal to 1 second.
 | |
| //
 | |
| // Uniqueness of a task is based on the following properties:
 | |
| //     - Task Type
 | |
| //     - Task Payload
 | |
| //     - Queue Name
 | |
| func Unique(ttl time.Duration) Option {
 | |
| 	return uniqueOption(ttl)
 | |
| }
 | |
| 
 | |
| func (ttl uniqueOption) String() string     { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) }
 | |
| func (ttl uniqueOption) Type() OptionType   { return UniqueOpt }
 | |
| func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
 | |
| 
 | |
| // ProcessAt returns an option to specify when to process the given task.
 | |
| //
 | |
| // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
 | |
| func ProcessAt(t time.Time) Option {
 | |
| 	return processAtOption(t)
 | |
| }
 | |
| 
 | |
| func (t processAtOption) String() string {
 | |
| 	return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate))
 | |
| }
 | |
| func (t processAtOption) Type() OptionType   { return ProcessAtOpt }
 | |
| func (t processAtOption) Value() interface{} { return time.Time(t) }
 | |
| 
 | |
| // ProcessIn returns an option to specify when to process the given task relative to the current time.
 | |
| //
 | |
| // If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
 | |
| func ProcessIn(d time.Duration) Option {
 | |
| 	return processInOption(d)
 | |
| }
 | |
| 
 | |
| func (d processInOption) String() string     { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
 | |
| func (d processInOption) Type() OptionType   { return ProcessInOpt }
 | |
| func (d processInOption) Value() interface{} { return time.Duration(d) }
 | |
| 
 | |
| // Retention returns an option to specify the duration of retention period for the task.
 | |
| // If this option is provided, the task will be stored as a completed task after successful processing.
 | |
| // A completed task will be deleted after the specified duration elapses.
 | |
| func Retention(d time.Duration) Option {
 | |
| 	return retentionOption(d)
 | |
| }
 | |
| 
 | |
| func (ttl retentionOption) String() string     { return fmt.Sprintf("Retention(%v)", time.Duration(ttl)) }
 | |
| func (ttl retentionOption) Type() OptionType   { return RetentionOpt }
 | |
| func (ttl retentionOption) Value() interface{} { return time.Duration(ttl) }
 | |
| 
 | |
| // Group returns an option to specify the group key used for the task.
 | |
| // Tasks in a given queue with the same group key will be aggregated into one task before passed to Handler.
 | |
| //
 | |
| // To customize the aggregation and grouping policy, specify the Group* fields in Config.
 | |
| func Group(key string) Option {
 | |
| 	return groupOption(key)
 | |
| }
 | |
| 
 | |
| func (key groupOption) String() string     { return fmt.Sprintf("Group(%q)", string(key)) }
 | |
| func (key groupOption) Type() OptionType   { return GroupOpt }
 | |
| func (key groupOption) Value() interface{} { return string(key) }
 | |
| 
 | |
| // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
 | |
| //
 | |
| // ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
 | |
| var ErrDuplicateTask = errors.New("task already exists")
 | |
| 
 | |
| // ErrTaskIDConflict indicates that the given task could not be enqueued since its task ID already exists.
 | |
| //
 | |
| // ErrTaskIDConflict error only applies to tasks enqueued with a TaskID option.
 | |
| var ErrTaskIDConflict = errors.New("task ID conflicts with another task")
 | |
| 
 | |
| type option struct {
 | |
| 	retry     int
 | |
| 	queue     string
 | |
| 	taskID    string
 | |
| 	timeout   time.Duration
 | |
| 	deadline  time.Time
 | |
| 	uniqueTTL time.Duration
 | |
| 	processAt time.Time
 | |
| 	retention time.Duration
 | |
| 	groupKey  string
 | |
| }
 | |
| 
 | |
| // composeOptions merges user provided options into the default options
 | |
| // and returns the composed option.
 | |
| // It also validates the user provided options and returns an error if any of
 | |
| // the user provided options fail the validations.
 | |
| func composeOptions(opts ...Option) (option, error) {
 | |
| 	res := option{
 | |
| 		retry:     defaultMaxRetry,
 | |
| 		queue:     base.DefaultQueueName,
 | |
| 		taskID:    uuid.NewString(),
 | |
| 		timeout:   0, // do not set to deafultTimeout here
 | |
| 		deadline:  time.Time{},
 | |
| 		processAt: time.Now(),
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		switch opt := opt.(type) {
 | |
| 		case retryOption:
 | |
| 			res.retry = int(opt)
 | |
| 		case queueOption:
 | |
| 			qname := string(opt)
 | |
| 			if err := base.ValidateQueueName(qname); err != nil {
 | |
| 				return option{}, err
 | |
| 			}
 | |
| 			res.queue = qname
 | |
| 		case taskIDOption:
 | |
| 			id := string(opt)
 | |
| 			if isBlank(id) {
 | |
| 				return option{}, errors.New("task ID cannot be empty")
 | |
| 			}
 | |
| 			res.taskID = id
 | |
| 		case timeoutOption:
 | |
| 			res.timeout = time.Duration(opt)
 | |
| 		case deadlineOption:
 | |
| 			res.deadline = time.Time(opt)
 | |
| 		case uniqueOption:
 | |
| 			ttl := time.Duration(opt)
 | |
| 			if ttl < 1*time.Second {
 | |
| 				return option{}, errors.New("Unique TTL cannot be less than 1s")
 | |
| 			}
 | |
| 			res.uniqueTTL = ttl
 | |
| 		case processAtOption:
 | |
| 			res.processAt = time.Time(opt)
 | |
| 		case processInOption:
 | |
| 			res.processAt = time.Now().Add(time.Duration(opt))
 | |
| 		case retentionOption:
 | |
| 			res.retention = time.Duration(opt)
 | |
| 		case groupOption:
 | |
| 			key := string(opt)
 | |
| 			if isBlank(key) {
 | |
| 				return option{}, errors.New("group key cannot be empty")
 | |
| 			}
 | |
| 			res.groupKey = key
 | |
| 		default:
 | |
| 			// ignore unexpected option
 | |
| 		}
 | |
| 	}
 | |
| 	return res, nil
 | |
| }
 | |
| 
 | |
| // isBlank returns true if the given s is empty or consist of all whitespaces.
 | |
| func isBlank(s string) bool {
 | |
| 	return strings.TrimSpace(s) == ""
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	// Default max retry count used if nothing is specified.
 | |
| 	defaultMaxRetry = 25
 | |
| 
 | |
| 	// Default timeout used if both timeout and deadline are not specified.
 | |
| 	defaultTimeout = 30 * time.Minute
 | |
| )
 | |
| 
 | |
| // Value zero indicates no timeout and no deadline.
 | |
| var (
 | |
| 	noTimeout  time.Duration = 0
 | |
| 	noDeadline time.Time     = time.Unix(0, 0)
 | |
| )
 | |
| 
 | |
| // Close closes the connection with redis.
 | |
| func (c *Client) Close() error {
 | |
| 	return c.broker.Close()
 | |
| }
 | |
| 
 | |
| // Enqueue enqueues the given task to a queue.
 | |
| //
 | |
| // Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
 | |
| //
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| // Any options provided to NewTask can be overridden by options passed to Enqueue.
 | |
| // By deafult, max retry is set to 25 and timeout is set to 30 minutes.
 | |
| //
 | |
| // If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
 | |
| //
 | |
| // Enqueue uses context.Background internally; to specify the context, use EnqueueContext.
 | |
| func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
 | |
| 	return c.EnqueueContext(context.Background(), task, opts...)
 | |
| }
 | |
| 
 | |
| // EnqueueContext enqueues the given task to a queue.
 | |
| //
 | |
| // EnqueueContext returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
 | |
| //
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| // Any options provided to NewTask can be overridden by options passed to Enqueue.
 | |
| // By deafult, max retry is set to 25 and timeout is set to 30 minutes.
 | |
| //
 | |
| // If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
 | |
| //
 | |
| // The first argument context applies to the enqueue operation. To specify task timeout and deadline, use Timeout and Deadline option instead.
 | |
| func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) (*TaskInfo, error) {
 | |
| 	if strings.TrimSpace(task.Type()) == "" {
 | |
| 		return nil, fmt.Errorf("task typename cannot be empty")
 | |
| 	}
 | |
| 	// merge task options with the options provided at enqueue time.
 | |
| 	opts = append(task.opts, opts...)
 | |
| 	opt, err := composeOptions(opts...)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	deadline := noDeadline
 | |
| 	if !opt.deadline.IsZero() {
 | |
| 		deadline = opt.deadline
 | |
| 	}
 | |
| 	timeout := noTimeout
 | |
| 	if opt.timeout != 0 {
 | |
| 		timeout = opt.timeout
 | |
| 	}
 | |
| 	if deadline.Equal(noDeadline) && timeout == noTimeout {
 | |
| 		// If neither deadline nor timeout are set, use default timeout.
 | |
| 		timeout = defaultTimeout
 | |
| 	}
 | |
| 	var uniqueKey string
 | |
| 	if opt.uniqueTTL > 0 {
 | |
| 		uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
 | |
| 	}
 | |
| 	msg := &base.TaskMessage{
 | |
| 		ID:        opt.taskID,
 | |
| 		Type:      task.Type(),
 | |
| 		Payload:   task.Payload(),
 | |
| 		Queue:     opt.queue,
 | |
| 		Retry:     opt.retry,
 | |
| 		Deadline:  deadline.Unix(),
 | |
| 		Timeout:   int64(timeout.Seconds()),
 | |
| 		UniqueKey: uniqueKey,
 | |
| 		GroupKey:  opt.groupKey,
 | |
| 		Retention: int64(opt.retention.Seconds()),
 | |
| 	}
 | |
| 	now := time.Now()
 | |
| 	var state base.TaskState
 | |
| 	if opt.processAt.After(now) {
 | |
| 		err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
 | |
| 		state = base.TaskStateScheduled
 | |
| 	} else if opt.groupKey != "" {
 | |
| 		// Use zero value for processAt since we don't know when the task will be aggregated and processed.
 | |
| 		opt.processAt = time.Time{}
 | |
| 		err = c.addToGroup(ctx, msg, opt.groupKey, opt.uniqueTTL)
 | |
| 		state = base.TaskStateAggregating
 | |
| 	} else {
 | |
| 		opt.processAt = now
 | |
| 		err = c.enqueue(ctx, msg, opt.uniqueTTL)
 | |
| 		state = base.TaskStatePending
 | |
| 	}
 | |
| 	switch {
 | |
| 	case errors.Is(err, errors.ErrDuplicateTask):
 | |
| 		return nil, fmt.Errorf("%w", ErrDuplicateTask)
 | |
| 	case errors.Is(err, errors.ErrTaskIdConflict):
 | |
| 		return nil, fmt.Errorf("%w", ErrTaskIDConflict)
 | |
| 	case err != nil:
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return newTaskInfo(msg, state, opt.processAt, nil), nil
 | |
| }
 | |
| 
 | |
| func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL time.Duration) error {
 | |
| 	if uniqueTTL > 0 {
 | |
| 		return c.broker.EnqueueUnique(ctx, msg, uniqueTTL)
 | |
| 	}
 | |
| 	return c.broker.Enqueue(ctx, msg)
 | |
| }
 | |
| 
 | |
| func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
 | |
| 	if uniqueTTL > 0 {
 | |
| 		ttl := t.Add(uniqueTTL).Sub(time.Now())
 | |
| 		return c.broker.ScheduleUnique(ctx, msg, t, ttl)
 | |
| 	}
 | |
| 	return c.broker.Schedule(ctx, msg, t)
 | |
| }
 | |
| 
 | |
| func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error {
 | |
| 	if uniqueTTL > 0 {
 | |
| 		return c.broker.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL)
 | |
| 	}
 | |
| 	return c.broker.AddToGroup(ctx, msg, groupKey)
 | |
| }
 |