2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00
asynq/client.go
2020-05-08 06:15:14 -07:00

260 lines
7.0 KiB
Go

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
)
// A Client is responsible for scheduling tasks.
//
// A Client is used to register tasks that should be processed
// immediately or some time in the future.
//
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
mu sync.Mutex
opts map[string][]Option
rdb *rdb.RDB
}
// NewClient and returns a new Client given a redis connection option.
func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r))
return &Client{
opts: make(map[string][]Option),
rdb: rdb,
}
}
// Option specifies the task processing behavior.
type Option interface{}
// Internal option representations.
type (
retryOption int
queueOption string
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
)
// MaxRetry returns an option to specify the max number of times
// the task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
if n < 0 {
n = 0
}
return retryOption(n)
}
// Queue returns an option to specify the queue to enqueue the task into.
//
// Queue name is case-insensitive and the lowercased version is used.
func Queue(name string) Option {
return queueOption(strings.ToLower(name))
}
// Timeout returns an option to specify how long a task may run.
//
// Zero duration means no limit.
func Timeout(d time.Duration) Option {
return timeoutOption(d)
}
// Deadline returns an option to specify the deadline for the given task.
func Deadline(t time.Time) Option {
return deadlineOption(t)
}
// Unique returns an option to enqueue a task only if the given task is unique.
// Task enqueued with this option is guaranteed to be unique within the given ttl.
// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
var ErrDuplicateTask = errors.New("task already exists")
type option struct {
retry int
queue string
timeout time.Duration
deadline time.Time
uniqueTTL time.Duration
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
timeout: 0,
deadline: time.Time{},
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
case queueOption:
res.queue = string(opt)
case timeoutOption:
res.timeout = time.Duration(opt)
case deadlineOption:
res.deadline = time.Time(opt)
case uniqueOption:
res.uniqueTTL = time.Duration(opt)
default:
// ignore unexpected option
}
}
return res
}
// uniqueKey computes the redis key used for the given task.
// It returns an empty string if ttl is zero.
func uniqueKey(t *Task, ttl time.Duration, qname string) string {
if ttl == 0 {
return ""
}
return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname)
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
}
// Default max retry count used if nothing is specified.
const defaultMaxRetry = 25
// SetDefaultOptions sets options to be used for a given task type.
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
//
// Default options can be overridden by options passed at enqueue time.
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.mu.Lock()
defer c.mu.Unlock()
c.opts[taskType] = opts
}
// EnqueueAt schedules task to be enqueued at the specified time.
//
// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
return c.enqueueAt(t, task, opts...)
}
// Enqueue enqueues task to be processed immediately.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.enqueueAt(time.Now(), task, opts...)
}
// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.enqueueAt(time.Now().Add(d), task, opts...)
}
// Close closes the connection with redis server.
func (c *Client) Close() error {
return c.rdb.Close()
}
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
c.mu.Lock()
defer c.mu.Unlock()
if defaults, ok := c.opts[task.Type]; ok {
opts = append(defaults, opts...)
}
opt := composeOptions(opts...)
msg := &base.TaskMessage{
ID: xid.New(),
Type: task.Type,
Payload: task.Payload.data,
Queue: opt.queue,
Retry: opt.retry,
Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339),
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
}
var err error
if time.Now().After(t) {
err = c.enqueue(msg, opt.uniqueTTL)
} else {
err = c.schedule(msg, t, opt.uniqueTTL)
}
if err == rdb.ErrDuplicateTask {
return fmt.Errorf("%w", ErrDuplicateTask)
}
return err
}
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
return c.rdb.EnqueueUnique(msg, uniqueTTL)
}
return c.rdb.Enqueue(msg)
}
func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
ttl := t.Add(uniqueTTL).Sub(time.Now())
return c.rdb.ScheduleUnique(msg, t, ttl)
}
return c.rdb.Schedule(msg, t)
}