diff --git a/CHANGELOG.md b/CHANGELOG.md index 77f15a8..892195e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- `NewTask` takes `Option` as variadic argument + +### Removed + +- `Client.SetDefaultOptions` is removed. Use `NewTask` instead to pass default options for tasks. + ## [0.18.6] - 2021-10-03 ### Changed diff --git a/asynq.go b/asynq.go index 4963781..6b5bb55 100644 --- a/asynq.go +++ b/asynq.go @@ -23,16 +23,21 @@ type Task struct { // payload holds data needed to perform the task. payload []byte + + // opts holds options for the task. + opts []Option } func (t *Task) Type() string { return t.typename } func (t *Task) Payload() []byte { return t.payload } // NewTask returns a new Task given a type name and payload data. -func NewTask(typename string, payload []byte) *Task { +// Options can be passed to configure task processing behavior. +func NewTask(typename string, payload []byte, opts ...Option) *Task { return &Task{ typename: typename, payload: payload, + opts: opts, } } diff --git a/client.go b/client.go index f7c9e79..624f146 100644 --- a/client.go +++ b/client.go @@ -7,7 +7,6 @@ package asynq import ( "fmt" "strings" - "sync" "time" "github.com/go-redis/redis/v8" @@ -24,9 +23,7 @@ import ( // // Clients are safe for concurrent use by multiple goroutines. type Client struct { - mu sync.Mutex - opts map[string][]Option - rdb *rdb.RDB + rdb *rdb.RDB } // NewClient returns a new Client instance given a redis connection option. @@ -35,11 +32,7 @@ func NewClient(r RedisConnOpt) *Client { if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - rdb := rdb.NewRDB(c) - return &Client{ - opts: make(map[string][]Option), - rdb: rdb, - } + return &Client{rdb: rdb.NewRDB(c)} } type OptionType int @@ -241,17 +234,6 @@ var ( noDeadline time.Time = time.Unix(0, 0) ) -// SetDefaultOptions sets options to be used for a given task type. -// The argument opts specifies the behavior of task processing. -// If there are conflicting Option values the last one overrides others. -// -// Default options can be overridden by options passed at enqueue time. -func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { - c.mu.Lock() - defer c.mu.Unlock() - c.opts[taskType] = opts -} - // Close closes the connection with redis. func (c *Client) Close() error { return c.rdb.Close() @@ -263,6 +245,7 @@ func (c *Client) Close() error { // // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. +// Any options provided to NewTask can be overridden by options passed to Enqueue. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. // // If no ProcessAt or ProcessIn options are provided, the task will be pending immediately. @@ -270,11 +253,8 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { if strings.TrimSpace(task.Type()) == "" { return nil, fmt.Errorf("task typename cannot be empty") } - c.mu.Lock() - if defaults, ok := c.opts[task.Type()]; ok { - opts = append(defaults, opts...) - } - c.mu.Unlock() + // merge task options with the options provided at enqueue time. + opts = append(task.opts, opts...) opt, err := composeOptions(opts...) if err != nil { return nil, err diff --git a/client_test.go b/client_test.go index 0ef3211..bfe205b 100644 --- a/client_test.go +++ b/client_test.go @@ -608,16 +608,17 @@ func TestClientEnqueueError(t *testing.T) { } } -func TestClientDefaultOptions(t *testing.T) { +func TestClientWithDefaultOptions(t *testing.T) { r := setup(t) now := time.Now() tests := []struct { desc string - defaultOpts []Option // options set at the client level. + defaultOpts []Option // options set at task initialization time opts []Option // options used at enqueue time. - task *Task + tasktype string + payload []byte wantInfo *TaskInfo queue string // queue that the message should go into. want *base.TaskMessage @@ -626,7 +627,8 @@ func TestClientDefaultOptions(t *testing.T) { desc: "With queue routing option", defaultOpts: []Option{Queue("feed")}, opts: []Option{}, - task: NewTask("feed:import", nil), + tasktype: "feed:import", + payload: nil, wantInfo: &TaskInfo{ Queue: "feed", Type: "feed:import", @@ -654,7 +656,8 @@ func TestClientDefaultOptions(t *testing.T) { desc: "With multiple options", defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{}, - task: NewTask("feed:import", nil), + tasktype: "feed:import", + payload: nil, wantInfo: &TaskInfo{ Queue: "feed", Type: "feed:import", @@ -682,7 +685,8 @@ func TestClientDefaultOptions(t *testing.T) { desc: "With overriding options at enqueue time", defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{Queue("critical")}, - task: NewTask("feed:import", nil), + tasktype: "feed:import", + payload: nil, wantInfo: &TaskInfo{ Queue: "critical", Type: "feed:import", @@ -711,8 +715,8 @@ func TestClientDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) defer c.Close() - c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) - gotInfo, err := c.Enqueue(tc.task, tc.opts...) + task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...) + gotInfo, err := c.Enqueue(task, tc.opts...) if err != nil { t.Fatal(err) }