2020-01-03 10:13:16 +08:00
|
|
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT license
|
|
|
|
// that can be found in the LICENSE file.
|
|
|
|
|
2019-11-20 13:19:46 +08:00
|
|
|
package asynq
|
|
|
|
|
|
|
|
import (
|
2020-03-18 21:49:39 +08:00
|
|
|
"fmt"
|
2020-04-26 22:48:38 +08:00
|
|
|
"sync"
|
2019-11-20 13:19:46 +08:00
|
|
|
"time"
|
|
|
|
|
2021-01-29 22:37:35 +08:00
|
|
|
"github.com/go-redis/redis/v7"
|
2020-07-02 21:21:20 +08:00
|
|
|
"github.com/google/uuid"
|
2019-12-22 23:15:45 +08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2021-05-11 12:19:57 +08:00
|
|
|
"github.com/hibiken/asynq/internal/errors"
|
2019-12-04 13:01:26 +08:00
|
|
|
"github.com/hibiken/asynq/internal/rdb"
|
2019-11-20 13:19:46 +08:00
|
|
|
)
|
|
|
|
|
2019-12-07 14:00:09 +08:00
|
|
|
// A Client is responsible for scheduling tasks.
|
|
|
|
//
|
2019-12-09 22:52:43 +08:00
|
|
|
// A Client is used to register tasks that should be processed
|
2019-12-07 14:00:09 +08:00
|
|
|
// immediately or some time in the future.
|
|
|
|
//
|
|
|
|
// Clients are safe for concurrent use by multiple goroutines.
|
2019-11-20 13:19:46 +08:00
|
|
|
type Client struct {
|
2020-04-26 22:48:38 +08:00
|
|
|
mu sync.Mutex
|
|
|
|
opts map[string][]Option
|
|
|
|
rdb *rdb.RDB
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|
|
|
|
|
2020-09-27 08:33:29 +08:00
|
|
|
// NewClient returns a new Client instance given a redis connection option.
|
2020-01-15 13:19:06 +08:00
|
|
|
func NewClient(r RedisConnOpt) *Client {
|
2021-01-29 22:37:35 +08:00
|
|
|
c, ok := r.MakeRedisClient().(redis.UniversalClient)
|
|
|
|
if !ok {
|
|
|
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
|
|
|
}
|
|
|
|
rdb := rdb.NewRDB(c)
|
2020-04-26 22:48:38 +08:00
|
|
|
return &Client{
|
|
|
|
opts: make(map[string][]Option),
|
|
|
|
rdb: rdb,
|
|
|
|
}
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
type OptionType int
|
|
|
|
|
|
|
|
const (
|
|
|
|
MaxRetryOpt OptionType = iota
|
|
|
|
QueueOpt
|
|
|
|
TimeoutOpt
|
|
|
|
DeadlineOpt
|
|
|
|
UniqueOpt
|
|
|
|
ProcessAtOpt
|
|
|
|
ProcessInOpt
|
|
|
|
)
|
|
|
|
|
2020-01-17 11:50:45 +08:00
|
|
|
// Option specifies the task processing behavior.
|
2020-10-10 21:46:47 +08:00
|
|
|
type Option interface {
|
|
|
|
// String returns a string representation of the option.
|
|
|
|
String() string
|
|
|
|
|
|
|
|
// Type describes the type of the option.
|
|
|
|
Type() OptionType
|
|
|
|
|
|
|
|
// Value returns a value used to create this option.
|
|
|
|
Value() interface{}
|
|
|
|
}
|
2019-12-21 23:42:32 +08:00
|
|
|
|
2020-01-06 22:53:40 +08:00
|
|
|
// Internal option representations.
|
|
|
|
type (
|
2020-09-05 21:29:08 +08:00
|
|
|
retryOption int
|
|
|
|
queueOption string
|
|
|
|
timeoutOption time.Duration
|
|
|
|
deadlineOption time.Time
|
|
|
|
uniqueOption time.Duration
|
|
|
|
processAtOption time.Time
|
|
|
|
processInOption time.Duration
|
2020-01-06 22:53:40 +08:00
|
|
|
)
|
2019-12-21 23:42:32 +08:00
|
|
|
|
|
|
|
// MaxRetry returns an option to specify the max number of times
|
2020-01-06 23:21:14 +08:00
|
|
|
// the task will be retried.
|
2019-12-21 23:42:32 +08:00
|
|
|
//
|
|
|
|
// Negative retry count is treated as zero retry.
|
|
|
|
func MaxRetry(n int) Option {
|
|
|
|
if n < 0 {
|
|
|
|
n = 0
|
|
|
|
}
|
|
|
|
return retryOption(n)
|
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
|
|
|
|
func (n retryOption) Type() OptionType { return MaxRetryOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (n retryOption) Value() interface{} { return int(n) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-01-17 11:50:45 +08:00
|
|
|
// Queue returns an option to specify the queue to enqueue the task into.
|
2020-10-10 21:46:47 +08:00
|
|
|
func Queue(qname string) Option {
|
2021-07-15 21:12:13 +08:00
|
|
|
return queueOption(qname)
|
2020-01-06 22:53:40 +08:00
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
|
|
|
|
func (qname queueOption) Type() OptionType { return QueueOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (qname queueOption) Value() interface{} { return string(qname) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-02-12 13:53:59 +08:00
|
|
|
// Timeout returns an option to specify how long a task may run.
|
2020-06-24 20:42:06 +08:00
|
|
|
// If the timeout elapses before the Handler returns, then the task
|
|
|
|
// will be retried.
|
2020-02-12 13:53:59 +08:00
|
|
|
//
|
|
|
|
// Zero duration means no limit.
|
2020-06-24 20:42:06 +08:00
|
|
|
//
|
|
|
|
// If there's a conflicting Deadline option, whichever comes earliest
|
|
|
|
// will be used.
|
2020-02-12 13:53:59 +08:00
|
|
|
func Timeout(d time.Duration) Option {
|
|
|
|
return timeoutOption(d)
|
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
|
|
|
|
func (d timeoutOption) Type() OptionType { return TimeoutOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (d timeoutOption) Value() interface{} { return time.Duration(d) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-03-08 12:24:03 +08:00
|
|
|
// Deadline returns an option to specify the deadline for the given task.
|
2020-06-24 20:42:06 +08:00
|
|
|
// If it reaches the deadline before the Handler returns, then the task
|
|
|
|
// will be retried.
|
|
|
|
//
|
|
|
|
// If there's a conflicting Timeout option, whichever comes earliest
|
|
|
|
// will be used.
|
2020-03-08 12:24:03 +08:00
|
|
|
func Deadline(t time.Time) Option {
|
|
|
|
return deadlineOption(t)
|
|
|
|
}
|
|
|
|
|
2020-12-01 22:53:33 +08:00
|
|
|
func (t deadlineOption) String() string {
|
|
|
|
return fmt.Sprintf("Deadline(%v)", time.Time(t).Format(time.UnixDate))
|
|
|
|
}
|
2020-10-10 21:46:47 +08:00
|
|
|
func (t deadlineOption) Type() OptionType { return DeadlineOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (t deadlineOption) Value() interface{} { return time.Time(t) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-03-18 21:49:39 +08:00
|
|
|
// Unique returns an option to enqueue a task only if the given task is unique.
|
|
|
|
// Task enqueued with this option is guaranteed to be unique within the given ttl.
|
|
|
|
// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
|
|
|
|
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
|
|
|
|
//
|
|
|
|
// Uniqueness of a task is based on the following properties:
|
|
|
|
// - Task Type
|
|
|
|
// - Task Payload
|
|
|
|
// - Queue Name
|
|
|
|
func Unique(ttl time.Duration) Option {
|
|
|
|
return uniqueOption(ttl)
|
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) }
|
|
|
|
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-09-05 21:29:08 +08:00
|
|
|
// ProcessAt returns an option to specify when to process the given task.
|
|
|
|
//
|
|
|
|
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
|
|
|
|
func ProcessAt(t time.Time) Option {
|
|
|
|
return processAtOption(t)
|
|
|
|
}
|
|
|
|
|
2020-12-01 22:53:33 +08:00
|
|
|
func (t processAtOption) String() string {
|
|
|
|
return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate))
|
|
|
|
}
|
2020-10-10 21:46:47 +08:00
|
|
|
func (t processAtOption) Type() OptionType { return ProcessAtOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (t processAtOption) Value() interface{} { return time.Time(t) }
|
2020-10-10 21:46:47 +08:00
|
|
|
|
2020-09-05 21:29:08 +08:00
|
|
|
// ProcessIn returns an option to specify when to process the given task relative to the current time.
|
|
|
|
//
|
|
|
|
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
|
|
|
|
func ProcessIn(d time.Duration) Option {
|
|
|
|
return processInOption(d)
|
|
|
|
}
|
|
|
|
|
2020-10-10 21:46:47 +08:00
|
|
|
func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
|
|
|
|
func (d processInOption) Type() OptionType { return ProcessInOpt }
|
2020-12-01 22:53:33 +08:00
|
|
|
func (d processInOption) Value() interface{} { return time.Duration(d) }
|
|
|
|
|
2020-03-18 21:49:39 +08:00
|
|
|
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
|
|
|
|
//
|
|
|
|
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
|
|
|
|
var ErrDuplicateTask = errors.New("task already exists")
|
|
|
|
|
2021-08-01 22:24:45 +08:00
|
|
|
// ErrEmptyTypeTask indicates that task's typename is not specified.
|
|
|
|
var ErrEmptyTypeTask = errors.New("task typename not specified")
|
|
|
|
|
2019-12-21 23:42:32 +08:00
|
|
|
type option struct {
|
2020-03-18 21:49:39 +08:00
|
|
|
retry int
|
|
|
|
queue string
|
|
|
|
timeout time.Duration
|
|
|
|
deadline time.Time
|
|
|
|
uniqueTTL time.Duration
|
2020-09-05 21:29:08 +08:00
|
|
|
processAt time.Time
|
2019-12-21 23:42:32 +08:00
|
|
|
}
|
|
|
|
|
2020-08-31 20:39:45 +08:00
|
|
|
// composeOptions merges user provided options into the default options
|
|
|
|
// and returns the composed option.
|
|
|
|
// It also validates the user provided options and returns an error if any of
|
|
|
|
// the user provided options fail the validations.
|
|
|
|
func composeOptions(opts ...Option) (option, error) {
|
2019-12-21 23:42:32 +08:00
|
|
|
res := option{
|
2020-09-05 21:29:08 +08:00
|
|
|
retry: defaultMaxRetry,
|
|
|
|
queue: base.DefaultQueueName,
|
|
|
|
timeout: 0, // do not set to deafultTimeout here
|
|
|
|
deadline: time.Time{},
|
|
|
|
processAt: time.Now(),
|
2019-12-21 23:42:32 +08:00
|
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
|
|
switch opt := opt.(type) {
|
|
|
|
case retryOption:
|
|
|
|
res.retry = int(opt)
|
2020-01-06 22:53:40 +08:00
|
|
|
case queueOption:
|
2021-07-15 21:12:13 +08:00
|
|
|
qname := string(opt)
|
|
|
|
if err := base.ValidateQueueName(qname); err != nil {
|
2020-08-31 20:53:17 +08:00
|
|
|
return option{}, err
|
2020-08-31 20:39:45 +08:00
|
|
|
}
|
2021-07-15 21:12:13 +08:00
|
|
|
res.queue = qname
|
2020-02-12 13:53:59 +08:00
|
|
|
case timeoutOption:
|
|
|
|
res.timeout = time.Duration(opt)
|
2020-03-08 12:24:03 +08:00
|
|
|
case deadlineOption:
|
|
|
|
res.deadline = time.Time(opt)
|
2020-03-18 21:49:39 +08:00
|
|
|
case uniqueOption:
|
|
|
|
res.uniqueTTL = time.Duration(opt)
|
2020-09-05 21:29:08 +08:00
|
|
|
case processAtOption:
|
|
|
|
res.processAt = time.Time(opt)
|
|
|
|
case processInOption:
|
|
|
|
res.processAt = time.Now().Add(time.Duration(opt))
|
2019-12-21 23:42:32 +08:00
|
|
|
default:
|
|
|
|
// ignore unexpected option
|
|
|
|
}
|
|
|
|
}
|
2020-08-31 20:39:45 +08:00
|
|
|
return res, nil
|
2019-12-21 23:42:32 +08:00
|
|
|
}
|
|
|
|
|
2020-06-17 12:12:50 +08:00
|
|
|
const (
|
|
|
|
// Default max retry count used if nothing is specified.
|
|
|
|
defaultMaxRetry = 25
|
|
|
|
|
|
|
|
// Default timeout used if both timeout and deadline are not specified.
|
|
|
|
defaultTimeout = 30 * time.Minute
|
|
|
|
)
|
|
|
|
|
|
|
|
// Value zero indicates no timeout and no deadline.
|
|
|
|
var (
|
|
|
|
noTimeout time.Duration = 0
|
|
|
|
noDeadline time.Time = time.Unix(0, 0)
|
|
|
|
)
|
2020-04-26 22:48:38 +08:00
|
|
|
|
|
|
|
// SetDefaultOptions sets options to be used for a given task type.
|
|
|
|
// The argument opts specifies the behavior of task processing.
|
|
|
|
// If there are conflicting Option values the last one overrides others.
|
|
|
|
//
|
|
|
|
// Default options can be overridden by options passed at enqueue time.
|
|
|
|
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
c.opts[taskType] = opts
|
|
|
|
}
|
2019-12-21 23:42:32 +08:00
|
|
|
|
2020-09-08 21:52:34 +08:00
|
|
|
// Close closes the connection with redis.
|
2020-09-05 21:29:08 +08:00
|
|
|
func (c *Client) Close() error {
|
|
|
|
return c.rdb.Close()
|
2020-04-26 22:48:38 +08:00
|
|
|
}
|
|
|
|
|
2020-09-05 21:29:08 +08:00
|
|
|
// Enqueue enqueues the given task to be processed asynchronously.
|
2020-04-26 22:48:38 +08:00
|
|
|
//
|
2021-05-15 21:43:18 +08:00
|
|
|
// Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error.
|
2020-04-26 22:48:38 +08:00
|
|
|
//
|
|
|
|
// The argument opts specifies the behavior of task processing.
|
|
|
|
// If there are conflicting Option values the last one overrides others.
|
2020-06-24 20:42:06 +08:00
|
|
|
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
|
2021-05-15 21:43:18 +08:00
|
|
|
//
|
|
|
|
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
|
|
|
|
func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
|
2021-08-01 22:24:45 +08:00
|
|
|
if task.Type() == "" {
|
|
|
|
return nil, fmt.Errorf("%w", ErrEmptyTypeTask)
|
|
|
|
}
|
2020-04-26 22:48:38 +08:00
|
|
|
c.mu.Lock()
|
2021-03-21 04:42:13 +08:00
|
|
|
if defaults, ok := c.opts[task.Type()]; ok {
|
2020-04-26 22:48:38 +08:00
|
|
|
opts = append(defaults, opts...)
|
|
|
|
}
|
2020-09-05 21:29:08 +08:00
|
|
|
c.mu.Unlock()
|
2020-08-31 20:39:45 +08:00
|
|
|
opt, err := composeOptions(opts...)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-06-17 12:12:50 +08:00
|
|
|
deadline := noDeadline
|
|
|
|
if !opt.deadline.IsZero() {
|
|
|
|
deadline = opt.deadline
|
|
|
|
}
|
|
|
|
timeout := noTimeout
|
|
|
|
if opt.timeout != 0 {
|
|
|
|
timeout = opt.timeout
|
|
|
|
}
|
|
|
|
if deadline.Equal(noDeadline) && timeout == noTimeout {
|
|
|
|
// If neither deadline nor timeout are set, use default timeout.
|
|
|
|
timeout = defaultTimeout
|
|
|
|
}
|
2020-08-07 20:36:54 +08:00
|
|
|
var uniqueKey string
|
|
|
|
if opt.uniqueTTL > 0 {
|
2021-03-21 04:42:13 +08:00
|
|
|
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
|
2020-08-07 20:36:54 +08:00
|
|
|
}
|
2019-12-22 23:15:45 +08:00
|
|
|
msg := &base.TaskMessage{
|
2020-07-02 21:21:20 +08:00
|
|
|
ID: uuid.New(),
|
2021-03-21 04:42:13 +08:00
|
|
|
Type: task.Type(),
|
|
|
|
Payload: task.Payload(),
|
2020-03-18 21:49:39 +08:00
|
|
|
Queue: opt.queue,
|
|
|
|
Retry: opt.retry,
|
2020-06-22 23:33:58 +08:00
|
|
|
Deadline: deadline.Unix(),
|
|
|
|
Timeout: int64(timeout.Seconds()),
|
2020-08-07 20:36:54 +08:00
|
|
|
UniqueKey: uniqueKey,
|
2020-03-18 21:49:39 +08:00
|
|
|
}
|
2020-06-14 20:31:24 +08:00
|
|
|
now := time.Now()
|
2021-05-15 21:43:18 +08:00
|
|
|
var state base.TaskState
|
2020-09-05 21:29:08 +08:00
|
|
|
if opt.processAt.Before(now) || opt.processAt.Equal(now) {
|
|
|
|
opt.processAt = now
|
2020-03-18 21:49:39 +08:00
|
|
|
err = c.enqueue(msg, opt.uniqueTTL)
|
2021-05-15 21:43:18 +08:00
|
|
|
state = base.TaskStatePending
|
2020-03-18 21:49:39 +08:00
|
|
|
} else {
|
2020-09-05 21:29:08 +08:00
|
|
|
err = c.schedule(msg, opt.processAt, opt.uniqueTTL)
|
2021-05-15 21:43:18 +08:00
|
|
|
state = base.TaskStateScheduled
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|
2020-07-03 20:49:52 +08:00
|
|
|
switch {
|
2021-05-11 12:19:57 +08:00
|
|
|
case errors.Is(err, errors.ErrDuplicateTask):
|
2020-07-03 20:49:52 +08:00
|
|
|
return nil, fmt.Errorf("%w", ErrDuplicateTask)
|
|
|
|
case err != nil:
|
|
|
|
return nil, err
|
2020-03-18 21:49:39 +08:00
|
|
|
}
|
2021-06-14 22:31:39 +08:00
|
|
|
return newTaskInfo(msg, state, opt.processAt), nil
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|
|
|
|
|
2020-03-18 21:49:39 +08:00
|
|
|
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
|
|
|
if uniqueTTL > 0 {
|
|
|
|
return c.rdb.EnqueueUnique(msg, uniqueTTL)
|
|
|
|
}
|
|
|
|
return c.rdb.Enqueue(msg)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
|
|
|
|
if uniqueTTL > 0 {
|
|
|
|
ttl := t.Add(uniqueTTL).Sub(time.Now())
|
|
|
|
return c.rdb.ScheduleUnique(msg, t, ttl)
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|
2020-02-24 07:40:04 +08:00
|
|
|
return c.rdb.Schedule(msg, t)
|
2019-11-20 13:19:46 +08:00
|
|
|
}
|