mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
Add SetDefaultOptions method to Client
This commit is contained in:
76
client.go
76
client.go
@@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
@@ -23,13 +24,18 @@ import (
|
||||
//
|
||||
// Clients are safe for concurrent use by multiple goroutines.
|
||||
type Client struct {
|
||||
rdb *rdb.RDB
|
||||
mu sync.Mutex
|
||||
opts map[string][]Option
|
||||
rdb *rdb.RDB
|
||||
}
|
||||
|
||||
// NewClient and returns a new Client given a redis connection option.
|
||||
func NewClient(r RedisConnOpt) *Client {
|
||||
rdb := rdb.NewRDB(createRedisClient(r))
|
||||
return &Client{rdb}
|
||||
return &Client{
|
||||
opts: make(map[string][]Option),
|
||||
rdb: rdb,
|
||||
}
|
||||
}
|
||||
|
||||
// Option specifies the task processing behavior.
|
||||
@@ -159,10 +165,19 @@ func serializePayload(payload map[string]interface{}) string {
|
||||
return b.String()
|
||||
}
|
||||
|
||||
const (
|
||||
// Max retry count by default
|
||||
defaultMaxRetry = 25
|
||||
)
|
||||
// Default max retry count used if nothing is specified.
|
||||
const defaultMaxRetry = 25
|
||||
|
||||
// SetDefaultOptions sets options to be used for a given task type.
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
//
|
||||
// Default options can be overridden by options passed at enqueue time.
|
||||
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.opts[taskType] = opts
|
||||
}
|
||||
|
||||
// EnqueueAt schedules task to be enqueued at the specified time.
|
||||
//
|
||||
@@ -171,6 +186,35 @@ const (
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
|
||||
return c.enqueueAt(t, task, opts...)
|
||||
}
|
||||
|
||||
// Enqueue enqueues task to be processed immediately.
|
||||
//
|
||||
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
|
||||
//
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
func (c *Client) Enqueue(task *Task, opts ...Option) error {
|
||||
return c.enqueueAt(time.Now(), task, opts...)
|
||||
}
|
||||
|
||||
// EnqueueIn schedules task to be enqueued after the specified delay.
|
||||
//
|
||||
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
|
||||
//
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
|
||||
return c.enqueueAt(time.Now().Add(d), task, opts...)
|
||||
}
|
||||
|
||||
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if defaults, ok := c.opts[task.Type]; ok {
|
||||
opts = append(defaults, opts...)
|
||||
}
|
||||
opt := composeOptions(opts...)
|
||||
msg := &base.TaskMessage{
|
||||
ID: xid.New(),
|
||||
@@ -194,26 +238,6 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Enqueue enqueues task to be processed immediately.
|
||||
//
|
||||
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
|
||||
//
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
func (c *Client) Enqueue(task *Task, opts ...Option) error {
|
||||
return c.EnqueueAt(time.Now(), task, opts...)
|
||||
}
|
||||
|
||||
// EnqueueIn schedules task to be enqueued after the specified delay.
|
||||
//
|
||||
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
|
||||
//
|
||||
// The argument opts specifies the behavior of task processing.
|
||||
// If there are conflicting Option values the last one overrides others.
|
||||
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
|
||||
return c.EnqueueAt(time.Now().Add(d), task, opts...)
|
||||
}
|
||||
|
||||
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
||||
if uniqueTTL > 0 {
|
||||
return c.rdb.EnqueueUnique(msg, uniqueTTL)
|
||||
|
Reference in New Issue
Block a user