2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 02:55:54 +08:00

Update NewTask function to take Option as varargs

This commit is contained in:
Ken Hibino 2021-09-10 05:56:17 -07:00
parent 23c522dc9f
commit 83cae4bb24
4 changed files with 31 additions and 34 deletions

View File

@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- `NewTask` takes `Option` as variadic argument
### Removed
- `Client.SetDefaultOptions` is removed. Use `NewTask` instead to pass default options for tasks.
## [0.18.6] - 2021-10-03 ## [0.18.6] - 2021-10-03
### Changed ### Changed

View File

@ -23,16 +23,21 @@ type Task struct {
// payload holds data needed to perform the task. // payload holds data needed to perform the task.
payload []byte payload []byte
// opts holds options for the task.
opts []Option
} }
func (t *Task) Type() string { return t.typename } func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload } func (t *Task) Payload() []byte { return t.payload }
// NewTask returns a new Task given a type name and payload data. // NewTask returns a new Task given a type name and payload data.
func NewTask(typename string, payload []byte) *Task { // Options can be passed to configure task processing behavior.
func NewTask(typename string, payload []byte, opts ...Option) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
payload: payload, payload: payload,
opts: opts,
} }
} }

View File

@ -7,7 +7,6 @@ package asynq
import ( import (
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@ -24,9 +23,7 @@ import (
// //
// Clients are safe for concurrent use by multiple goroutines. // Clients are safe for concurrent use by multiple goroutines.
type Client struct { type Client struct {
mu sync.Mutex rdb *rdb.RDB
opts map[string][]Option
rdb *rdb.RDB
} }
// NewClient returns a new Client instance given a redis connection option. // NewClient returns a new Client instance given a redis connection option.
@ -35,11 +32,7 @@ func NewClient(r RedisConnOpt) *Client {
if !ok { if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
} }
rdb := rdb.NewRDB(c) return &Client{rdb: rdb.NewRDB(c)}
return &Client{
opts: make(map[string][]Option),
rdb: rdb,
}
} }
type OptionType int type OptionType int
@ -241,17 +234,6 @@ var (
noDeadline time.Time = time.Unix(0, 0) noDeadline time.Time = time.Unix(0, 0)
) )
// SetDefaultOptions sets options to be used for a given task type.
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
//
// Default options can be overridden by options passed at enqueue time.
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.mu.Lock()
defer c.mu.Unlock()
c.opts[taskType] = opts
}
// Close closes the connection with redis. // Close closes the connection with redis.
func (c *Client) Close() error { func (c *Client) Close() error {
return c.rdb.Close() return c.rdb.Close()
@ -263,6 +245,7 @@ func (c *Client) Close() error {
// //
// The argument opts specifies the behavior of task processing. // The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others. // If there are conflicting Option values the last one overrides others.
// Any options provided to NewTask can be overridden by options passed to Enqueue.
// By deafult, max retry is set to 25 and timeout is set to 30 minutes. // By deafult, max retry is set to 25 and timeout is set to 30 minutes.
// //
// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately. // If no ProcessAt or ProcessIn options are provided, the task will be pending immediately.
@ -270,11 +253,8 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
if strings.TrimSpace(task.Type()) == "" { if strings.TrimSpace(task.Type()) == "" {
return nil, fmt.Errorf("task typename cannot be empty") return nil, fmt.Errorf("task typename cannot be empty")
} }
c.mu.Lock() // merge task options with the options provided at enqueue time.
if defaults, ok := c.opts[task.Type()]; ok { opts = append(task.opts, opts...)
opts = append(defaults, opts...)
}
c.mu.Unlock()
opt, err := composeOptions(opts...) opt, err := composeOptions(opts...)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -608,16 +608,17 @@ func TestClientEnqueueError(t *testing.T) {
} }
} }
func TestClientDefaultOptions(t *testing.T) { func TestClientWithDefaultOptions(t *testing.T) {
r := setup(t) r := setup(t)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
desc string desc string
defaultOpts []Option // options set at the client level. defaultOpts []Option // options set at task initialization time
opts []Option // options used at enqueue time. opts []Option // options used at enqueue time.
task *Task tasktype string
payload []byte
wantInfo *TaskInfo wantInfo *TaskInfo
queue string // queue that the message should go into. queue string // queue that the message should go into.
want *base.TaskMessage want *base.TaskMessage
@ -626,7 +627,8 @@ func TestClientDefaultOptions(t *testing.T) {
desc: "With queue routing option", desc: "With queue routing option",
defaultOpts: []Option{Queue("feed")}, defaultOpts: []Option{Queue("feed")},
opts: []Option{}, opts: []Option{},
task: NewTask("feed:import", nil), tasktype: "feed:import",
payload: nil,
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
Queue: "feed", Queue: "feed",
Type: "feed:import", Type: "feed:import",
@ -654,7 +656,8 @@ func TestClientDefaultOptions(t *testing.T) {
desc: "With multiple options", desc: "With multiple options",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{}, opts: []Option{},
task: NewTask("feed:import", nil), tasktype: "feed:import",
payload: nil,
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
Queue: "feed", Queue: "feed",
Type: "feed:import", Type: "feed:import",
@ -682,7 +685,8 @@ func TestClientDefaultOptions(t *testing.T) {
desc: "With overriding options at enqueue time", desc: "With overriding options at enqueue time",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{Queue("critical")}, opts: []Option{Queue("critical")},
task: NewTask("feed:import", nil), tasktype: "feed:import",
payload: nil,
wantInfo: &TaskInfo{ wantInfo: &TaskInfo{
Queue: "critical", Queue: "critical",
Type: "feed:import", Type: "feed:import",
@ -711,8 +715,8 @@ func TestClientDefaultOptions(t *testing.T) {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close() defer c.Close()
c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...)
gotInfo, err := c.Enqueue(tc.task, tc.opts...) gotInfo, err := c.Enqueue(task, tc.opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }