2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Return Result struct to caller of Enqueue

This commit is contained in:
Ken Hibino
2020-07-03 05:49:52 -07:00
parent 8b60e6a268
commit 34b90ecc8a
7 changed files with 201 additions and 48 deletions

View File

@@ -200,6 +200,35 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.opts[taskType] = opts
}
// A Result holds enqueued task's metadata.
type Result struct {
// ID is a unique identifier for the task.
ID string
// Retry is the maximum number of retry for the task.
Retry int
// Queue is a name of the queue the task is enqueued to.
Queue string
// Timeout is the timeout value for the task.
// Counting for timeout starts when a worker starts processing the task.
// If task processing doesn't complete within the timeout, the task will be retried.
// The value zero means no timeout.
//
// If deadline is set, min(now+timeout, deadline) is used, where the now is the time when
// a worker starts processing the task.
Timeout time.Duration
// Deadline is the deadline value for the task.
// If task processing doesn't complete before the deadline, the task will be retried.
// The value time.Unix(0, 0) means no deadline.
//
// If timeout is set, min(now+timeout, deadline) is used, where the now is the time when
// a worker starts processing the task.
Deadline time.Time
}
// EnqueueAt schedules task to be enqueued at the specified time.
//
// EnqueueAt returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
@@ -207,7 +236,7 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) {
return c.enqueueAt(t, task, opts...)
}
@@ -218,7 +247,7 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
return c.enqueueAt(time.Now(), task, opts...)
}
@@ -229,7 +258,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) error {
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
// By deafult, max retry is set to 25 and timeout is set to 30 minutes.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) (*Result, error) {
return c.enqueueAt(time.Now().Add(d), task, opts...)
}
@@ -238,7 +267,7 @@ func (c *Client) Close() error {
return c.rdb.Close()
}
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, error) {
c.mu.Lock()
defer c.mu.Unlock()
if defaults, ok := c.opts[task.Type]; ok {
@@ -274,10 +303,19 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
} else {
err = c.schedule(msg, t, opt.uniqueTTL)
}
if err == rdb.ErrDuplicateTask {
return fmt.Errorf("%w", ErrDuplicateTask)
switch {
case err == rdb.ErrDuplicateTask:
return nil, fmt.Errorf("%w", ErrDuplicateTask)
case err != nil:
return nil, err
}
return err
return &Result{
ID: msg.ID.String(),
Queue: msg.Queue,
Retry: msg.Retry,
Timeout: timeout,
Deadline: deadline,
}, nil
}
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {