mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			261 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 Kentaro Hibino. All rights reserved.
 | |
| // Use of this source code is governed by a MIT license
 | |
| // that can be found in the LICENSE file.
 | |
| 
 | |
| package asynq
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/hibiken/asynq/internal/base"
 | |
| 	"github.com/hibiken/asynq/internal/rdb"
 | |
| 	"github.com/rs/xid"
 | |
| )
 | |
| 
 | |
| // A Client is responsible for scheduling tasks.
 | |
| //
 | |
| // A Client is used to register tasks that should be processed
 | |
| // immediately or some time in the future.
 | |
| //
 | |
| // Clients are safe for concurrent use by multiple goroutines.
 | |
| type Client struct {
 | |
| 	mu   sync.Mutex
 | |
| 	opts map[string][]Option
 | |
| 	rdb  *rdb.RDB
 | |
| }
 | |
| 
 | |
| // NewClient and returns a new Client given a redis connection option.
 | |
| func NewClient(r RedisConnOpt) *Client {
 | |
| 	rdb := rdb.NewRDB(createRedisClient(r))
 | |
| 	return &Client{
 | |
| 		opts: make(map[string][]Option),
 | |
| 		rdb:  rdb,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Option specifies the task processing behavior.
 | |
| type Option interface{}
 | |
| 
 | |
| // Internal option representations.
 | |
| type (
 | |
| 	retryOption    int
 | |
| 	queueOption    string
 | |
| 	timeoutOption  time.Duration
 | |
| 	deadlineOption time.Time
 | |
| 	uniqueOption   time.Duration
 | |
| )
 | |
| 
 | |
| // MaxRetry returns an option to specify the max number of times
 | |
| // the task will be retried.
 | |
| //
 | |
| // Negative retry count is treated as zero retry.
 | |
| func MaxRetry(n int) Option {
 | |
| 	if n < 0 {
 | |
| 		n = 0
 | |
| 	}
 | |
| 	return retryOption(n)
 | |
| }
 | |
| 
 | |
| // Queue returns an option to specify the queue to enqueue the task into.
 | |
| //
 | |
| // Queue name is case-insensitive and the lowercased version is used.
 | |
| func Queue(name string) Option {
 | |
| 	return queueOption(strings.ToLower(name))
 | |
| }
 | |
| 
 | |
| // Timeout returns an option to specify how long a task may run.
 | |
| //
 | |
| // Zero duration means no limit.
 | |
| func Timeout(d time.Duration) Option {
 | |
| 	return timeoutOption(d)
 | |
| }
 | |
| 
 | |
| // Deadline returns an option to specify the deadline for the given task.
 | |
| func Deadline(t time.Time) Option {
 | |
| 	return deadlineOption(t)
 | |
| }
 | |
| 
 | |
| // Unique returns an option to enqueue a task only if the given task is unique.
 | |
| // Task enqueued with this option is guaranteed to be unique within the given ttl.
 | |
| // Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
 | |
| // ErrDuplicateTask error is returned when enqueueing a duplicate task.
 | |
| //
 | |
| // Uniqueness of a task is based on the following properties:
 | |
| //     - Task Type
 | |
| //     - Task Payload
 | |
| //     - Queue Name
 | |
| func Unique(ttl time.Duration) Option {
 | |
| 	return uniqueOption(ttl)
 | |
| }
 | |
| 
 | |
| // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
 | |
| //
 | |
| // ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
 | |
| var ErrDuplicateTask = errors.New("task already exists")
 | |
| 
 | |
| type option struct {
 | |
| 	retry     int
 | |
| 	queue     string
 | |
| 	timeout   time.Duration
 | |
| 	deadline  time.Time
 | |
| 	uniqueTTL time.Duration
 | |
| }
 | |
| 
 | |
| func composeOptions(opts ...Option) option {
 | |
| 	res := option{
 | |
| 		retry:    defaultMaxRetry,
 | |
| 		queue:    base.DefaultQueueName,
 | |
| 		timeout:  0,
 | |
| 		deadline: time.Time{},
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		switch opt := opt.(type) {
 | |
| 		case retryOption:
 | |
| 			res.retry = int(opt)
 | |
| 		case queueOption:
 | |
| 			res.queue = string(opt)
 | |
| 		case timeoutOption:
 | |
| 			res.timeout = time.Duration(opt)
 | |
| 		case deadlineOption:
 | |
| 			res.deadline = time.Time(opt)
 | |
| 		case uniqueOption:
 | |
| 			res.uniqueTTL = time.Duration(opt)
 | |
| 		default:
 | |
| 			// ignore unexpected option
 | |
| 		}
 | |
| 	}
 | |
| 	return res
 | |
| }
 | |
| 
 | |
| // uniqueKey computes the redis key used for the given task.
 | |
| // It returns an empty string if ttl is zero.
 | |
| func uniqueKey(t *Task, ttl time.Duration, qname string) string {
 | |
| 	if ttl == 0 {
 | |
| 		return ""
 | |
| 	}
 | |
| 	return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname)
 | |
| }
 | |
| 
 | |
| func serializePayload(payload map[string]interface{}) string {
 | |
| 	if payload == nil {
 | |
| 		return "nil"
 | |
| 	}
 | |
| 	type entry struct {
 | |
| 		k string
 | |
| 		v interface{}
 | |
| 	}
 | |
| 	var es []entry
 | |
| 	for k, v := range payload {
 | |
| 		es = append(es, entry{k, v})
 | |
| 	}
 | |
| 	// sort entries by key
 | |
| 	sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
 | |
| 	var b strings.Builder
 | |
| 	for _, e := range es {
 | |
| 		if b.Len() > 0 {
 | |
| 			b.WriteString(",")
 | |
| 		}
 | |
| 		b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
 | |
| 	}
 | |
| 	return b.String()
 | |
| }
 | |
| 
 | |
| // Default max retry count used if nothing is specified.
 | |
| const defaultMaxRetry = 25
 | |
| 
 | |
| // SetDefaultOptions sets options to be used for a given task type.
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| //
 | |
| // Default options can be overridden by options passed at enqueue time.
 | |
| func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	c.opts[taskType] = opts
 | |
| }
 | |
| 
 | |
| // EnqueueAt schedules task to be enqueued at the specified time.
 | |
| //
 | |
| // EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
 | |
| //
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
 | |
| 	return c.enqueueAt(t, task, opts...)
 | |
| }
 | |
| 
 | |
| // Enqueue enqueues task to be processed immediately.
 | |
| //
 | |
| // Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
 | |
| //
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| func (c *Client) Enqueue(task *Task, opts ...Option) error {
 | |
| 	return c.enqueueAt(time.Now(), task, opts...)
 | |
| }
 | |
| 
 | |
| // EnqueueIn schedules task to be enqueued after the specified delay.
 | |
| //
 | |
| // EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
 | |
| //
 | |
| // The argument opts specifies the behavior of task processing.
 | |
| // If there are conflicting Option values the last one overrides others.
 | |
| func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
 | |
| 	return c.enqueueAt(time.Now().Add(d), task, opts...)
 | |
| }
 | |
| 
 | |
| // Close closes the connection with redis server.
 | |
| func (c *Client) Close() error {
 | |
| 	return c.rdb.Close()
 | |
| }
 | |
| 
 | |
| func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	if defaults, ok := c.opts[task.Type]; ok {
 | |
| 		opts = append(defaults, opts...)
 | |
| 	}
 | |
| 	opt := composeOptions(opts...)
 | |
| 	msg := &base.TaskMessage{
 | |
| 		ID:        xid.New(),
 | |
| 		Type:      task.Type,
 | |
| 		Payload:   task.Payload.data,
 | |
| 		Queue:     opt.queue,
 | |
| 		Retry:     opt.retry,
 | |
| 		Timeout:   opt.timeout.String(),
 | |
| 		Deadline:  opt.deadline.Format(time.RFC3339),
 | |
| 		UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
 | |
| 	}
 | |
| 	var err error
 | |
| 	now := time.Now()
 | |
| 	if t.Before(now) || t.Equal(now) {
 | |
| 		err = c.enqueue(msg, opt.uniqueTTL)
 | |
| 	} else {
 | |
| 		err = c.schedule(msg, t, opt.uniqueTTL)
 | |
| 	}
 | |
| 	if err == rdb.ErrDuplicateTask {
 | |
| 		return fmt.Errorf("%w", ErrDuplicateTask)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
 | |
| 	if uniqueTTL > 0 {
 | |
| 		return c.rdb.EnqueueUnique(msg, uniqueTTL)
 | |
| 	}
 | |
| 	return c.rdb.Enqueue(msg)
 | |
| }
 | |
| 
 | |
| func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
 | |
| 	if uniqueTTL > 0 {
 | |
| 		ttl := t.Add(uniqueTTL).Sub(time.Now())
 | |
| 		return c.rdb.ScheduleUnique(msg, t, ttl)
 | |
| 	}
 | |
| 	return c.rdb.Schedule(msg, t)
 | |
| }
 |