diff --git a/CHANGELOG.md b/CHANGELOG.md index 185a62d..0c27dfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `ParseRedisURI` helper function is added to create a `RedisConnOpt` from a URI string. +- `SetDefaultOptions` method is added to `Client`. ## [0.8.0] - 2020-04-19 diff --git a/README.md b/README.md index b588f01..8b557ea 100644 --- a/README.md +++ b/README.md @@ -173,12 +173,25 @@ func main() { // -------------------------------------------------------------------------- - // Example 3: Pass options to tune task processing behavior. + // Example 3: Set options to tune task processing behavior. // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. // -------------------------------------------------------------------------- + c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute)) + t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") - err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute)) + err = c.Enqueue(t) + if err != nil { + log.Fatal("could not enqueue task: %v", err) + } + + // -------------------------------------------------------------------------- + // Example 4: Pass options to tune task processing behavior at enqueue time. + // Options passed at enqueue time override default ones, if any. + // -------------------------------------------------------------------------- + + t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") + err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) } @@ -194,6 +207,8 @@ You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq? package main import ( + "log" + "github.com/hibiken/asynq" "your/app/package/tasks" ) diff --git a/client.go b/client.go index 79cf38c..3da3a89 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/hibiken/asynq/internal/base" @@ -23,13 +24,18 @@ import ( // // Clients are safe for concurrent use by multiple goroutines. type Client struct { - rdb *rdb.RDB + mu sync.Mutex + opts map[string][]Option + rdb *rdb.RDB } // NewClient and returns a new Client given a redis connection option. func NewClient(r RedisConnOpt) *Client { rdb := rdb.NewRDB(createRedisClient(r)) - return &Client{rdb} + return &Client{ + opts: make(map[string][]Option), + rdb: rdb, + } } // Option specifies the task processing behavior. @@ -159,10 +165,19 @@ func serializePayload(payload map[string]interface{}) string { return b.String() } -const ( - // Max retry count by default - defaultMaxRetry = 25 -) +// Default max retry count used if nothing is specified. +const defaultMaxRetry = 25 + +// SetDefaultOptions sets options to be used for a given task type. +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +// +// Default options can be overridden by options passed at enqueue time. +func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { + c.mu.Lock() + defer c.mu.Unlock() + c.opts[taskType] = opts +} // EnqueueAt schedules task to be enqueued at the specified time. // @@ -171,6 +186,35 @@ const ( // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { + return c.enqueueAt(t, task, opts...) +} + +// Enqueue enqueues task to be processed immediately. +// +// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. +// +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +func (c *Client) Enqueue(task *Task, opts ...Option) error { + return c.enqueueAt(time.Now(), task, opts...) +} + +// EnqueueIn schedules task to be enqueued after the specified delay. +// +// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error. +// +// The argument opts specifies the behavior of task processing. +// If there are conflicting Option values the last one overrides others. +func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error { + return c.enqueueAt(time.Now().Add(d), task, opts...) +} + +func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error { + c.mu.Lock() + defer c.mu.Unlock() + if defaults, ok := c.opts[task.Type]; ok { + opts = append(defaults, opts...) + } opt := composeOptions(opts...) msg := &base.TaskMessage{ ID: xid.New(), @@ -194,26 +238,6 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { return err } -// Enqueue enqueues task to be processed immediately. -// -// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. -// -// The argument opts specifies the behavior of task processing. -// If there are conflicting Option values the last one overrides others. -func (c *Client) Enqueue(task *Task, opts ...Option) error { - return c.EnqueueAt(time.Now(), task, opts...) -} - -// EnqueueIn schedules task to be enqueued after the specified delay. -// -// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error. -// -// The argument opts specifies the behavior of task processing. -// If there are conflicting Option values the last one overrides others. -func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error { - return c.EnqueueAt(time.Now().Add(d), task, opts...) -} - func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { if uniqueTTL > 0 { return c.rdb.EnqueueUnique(msg, uniqueTTL) diff --git a/client_test.go b/client_test.go index 2817b5a..348d056 100644 --- a/client_test.go +++ b/client_test.go @@ -15,6 +15,11 @@ import ( "github.com/hibiken/asynq/internal/base" ) +var ( + noTimeout = time.Duration(0).String() + noDeadline = time.Time{}.Format(time.RFC3339) +) + func TestClientEnqueueAt(t *testing.T) { r := setup(t) client := NewClient(RedisClientOpt{ @@ -27,9 +32,6 @@ func TestClientEnqueueAt(t *testing.T) { var ( now = time.Now() oneHourLater = now.Add(time.Hour) - - noTimeout = time.Duration(0).String() - noDeadline = time.Time{}.Format(time.RFC3339) ) tests := []struct { @@ -113,11 +115,6 @@ func TestClientEnqueue(t *testing.T) { task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) - var ( - noTimeout = time.Duration(0).String() - noDeadline = time.Time{}.Format(time.RFC3339) - ) - tests := []struct { desc string task *Task @@ -287,11 +284,6 @@ func TestClientEnqueueIn(t *testing.T) { task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) - var ( - noTimeout = time.Duration(0).String() - noDeadline = time.Time{}.Format(time.RFC3339) - ) - tests := []struct { desc string task *Task @@ -364,6 +356,86 @@ func TestClientEnqueueIn(t *testing.T) { } } +func TestClientDefaultOptions(t *testing.T) { + r := setup(t) + + tests := []struct { + desc string + defaultOpts []Option // options set at the client level. + opts []Option // options used at enqueue time. + task *Task + queue string // queue that the message should go into. + want *base.TaskMessage + }{ + { + desc: "With queue routing option", + defaultOpts: []Option{Queue("feed")}, + opts: []Option{}, + task: NewTask("feed:import", nil), + queue: "feed", + want: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: defaultMaxRetry, + Queue: "feed", + Timeout: noTimeout, + Deadline: noDeadline, + }, + }, + { + desc: "With multiple options", + defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, + opts: []Option{}, + task: NewTask("feed:import", nil), + queue: "feed", + want: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: 5, + Queue: "feed", + Timeout: noTimeout, + Deadline: noDeadline, + }, + }, + { + desc: "With overriding options at enqueue time", + defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, + opts: []Option{Queue("critical")}, + task: NewTask("feed:import", nil), + queue: "critical", + want: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: 5, + Queue: "critical", + Timeout: noTimeout, + Deadline: noDeadline, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) + c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) + c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) + err := c.Enqueue(tc.task, tc.opts...) + if err != nil { + t.Fatal(err) + } + enqueued := h.GetEnqueuedMessages(t, r, tc.queue) + if len(enqueued) != 1 { + t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.", + tc.desc, tc.queue, len(enqueued)) + continue + } + got := enqueued[0] + if diff := cmp.Diff(tc.want, got, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in enqueued task message; (-want,+got)\n%s", + tc.desc, diff) + } + } +} + func TestUniqueKey(t *testing.T) { tests := []struct { desc string