From 8312515e6457bc5aa92ce905a8c2b36257fb043e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 10 Oct 2020 06:46:47 -0700 Subject: [PATCH] Update Option interface - Added `String()`, `Type()`, and `Value()` methods to the interface to aid with debugging and error handling. --- CHANGELOG.md | 5 ++++ client.go | 55 ++++++++++++++++++++++++++++++++++++++++--- internal/base/base.go | 2 +- scheduler.go | 24 +++---------------- scheduler_test.go | 35 --------------------------- 5 files changed, 61 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1aa9652..6438706 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed +- interface `Option` has changed. See the godoc for the new interface. + This change would have no impact as long as you are using exported functions (e.g. `MaxRetry`, `Queue`, etc) + to create `Option`s. + ### Added - `Payload.String() string` method is added diff --git a/client.go b/client.go index 6d6b80b..28a53e0 100644 --- a/client.go +++ b/client.go @@ -37,8 +37,29 @@ func NewClient(r RedisConnOpt) *Client { } } +type OptionType int + +const ( + MaxRetryOpt OptionType = iota + QueueOpt + TimeoutOpt + DeadlineOpt + UniqueOpt + ProcessAtOpt + ProcessInOpt +) + // Option specifies the task processing behavior. -type Option interface{} +type Option interface { + // String returns a string representation of the option. + String() string + + // Type describes the type of the option. + Type() OptionType + + // Value returns a value used to create this option. + Value() interface{} +} // Internal option representations. type ( @@ -62,13 +83,21 @@ func MaxRetry(n int) Option { return retryOption(n) } +func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) } +func (n retryOption) Type() OptionType { return MaxRetryOpt } +func (n retryOption) Value() interface{} { return n } + // Queue returns an option to specify the queue to enqueue the task into. // // Queue name is case-insensitive and the lowercased version is used. -func Queue(name string) Option { - return queueOption(strings.ToLower(name)) +func Queue(qname string) Option { + return queueOption(strings.ToLower(qname)) } +func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) } +func (qname queueOption) Type() OptionType { return QueueOpt } +func (qname queueOption) Value() interface{} { return qname } + // Timeout returns an option to specify how long a task may run. // If the timeout elapses before the Handler returns, then the task // will be retried. @@ -81,6 +110,10 @@ func Timeout(d time.Duration) Option { return timeoutOption(d) } +func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) } +func (d timeoutOption) Type() OptionType { return TimeoutOpt } +func (d timeoutOption) Value() interface{} { return d } + // Deadline returns an option to specify the deadline for the given task. // If it reaches the deadline before the Handler returns, then the task // will be retried. @@ -91,6 +124,10 @@ func Deadline(t time.Time) Option { return deadlineOption(t) } +func (t deadlineOption) String() string { return fmt.Sprintf("Deadline(%v)", time.Time(t)) } +func (t deadlineOption) Type() OptionType { return DeadlineOpt } +func (t deadlineOption) Value() interface{} { return t } + // Unique returns an option to enqueue a task only if the given task is unique. // Task enqueued with this option is guaranteed to be unique within the given ttl. // Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued. @@ -104,6 +141,10 @@ func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } +func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) } +func (ttl uniqueOption) Type() OptionType { return UniqueOpt } +func (ttl uniqueOption) Value() interface{} { return ttl } + // ProcessAt returns an option to specify when to process the given task. // // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. @@ -111,6 +152,10 @@ func ProcessAt(t time.Time) Option { return processAtOption(t) } +func (t processAtOption) String() string { return fmt.Sprintf("ProcessAt(%v)", time.Time(t)) } +func (t processAtOption) Type() OptionType { return ProcessAtOpt } +func (t processAtOption) Value() interface{} { return t } + // ProcessIn returns an option to specify when to process the given task relative to the current time. // // If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others. @@ -118,6 +163,10 @@ func ProcessIn(d time.Duration) Option { return processInOption(d) } +func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) } +func (d processInOption) Type() OptionType { return ProcessInOpt } +func (d processInOption) Value() interface{} { return d } + // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. diff --git a/internal/base/base.go b/internal/base/base.go index 15a6b2d..db94299 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -299,7 +299,7 @@ type SchedulerEntry struct { Payload map[string]interface{} // Opts is the options for the periodic task. - Opts string + Opts []string // Next shows the next time the task will be enqueued. Next time.Time diff --git a/scheduler.go b/scheduler.go index 9654232..55b2c0a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -7,7 +7,6 @@ package asynq import ( "fmt" "os" - "strings" "sync" "time" @@ -222,27 +221,10 @@ func (s *Scheduler) beat() { } } -func stringifyOptions(opts []Option) string { +func stringifyOptions(opts []Option) []string { var res []string for _, opt := range opts { - switch opt := opt.(type) { - case retryOption: - res = append(res, fmt.Sprintf("MaxRetry(%d)", int(opt))) - case queueOption: - res = append(res, fmt.Sprintf("Queue(%q)", string(opt))) - case timeoutOption: - res = append(res, fmt.Sprintf("Timeout(%v)", time.Duration(opt))) - case deadlineOption: - res = append(res, fmt.Sprintf("Deadline(%v)", time.Time(opt))) - case uniqueOption: - res = append(res, fmt.Sprintf("Unique(%v)", time.Duration(opt))) - case processAtOption: - res = append(res, fmt.Sprintf("ProcessAt(%v)", time.Time(opt))) - case processInOption: - res = append(res, fmt.Sprintf("ProcessIn(%v)", time.Duration(opt))) - default: - // ignore unexpected option - } + res = append(res, opt.String()) } - return strings.Join(res, ", ") + return res } diff --git a/scheduler_test.go b/scheduler_test.go index 0cf4bc4..4fe4c80 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -5,7 +5,6 @@ package asynq import ( - "fmt" "testing" "time" @@ -77,37 +76,3 @@ func TestScheduler(t *testing.T) { } } } - -func TestStringifyOptions(t *testing.T) { - now := time.Now() - oneHourFromNow := now.Add(1 * time.Hour) - twoHoursFromNow := now.Add(2 * time.Hour) - tests := []struct { - opts []Option - want string - }{ - { - opts: []Option{MaxRetry(10)}, - want: "MaxRetry(10)", - }, - { - opts: []Option{Queue("custom"), Timeout(1 * time.Minute)}, - want: `Queue("custom"), Timeout(1m0s)`, - }, - { - opts: []Option{ProcessAt(oneHourFromNow), Deadline(twoHoursFromNow)}, - want: fmt.Sprintf("ProcessAt(%v), Deadline(%v)", oneHourFromNow, twoHoursFromNow), - }, - { - opts: []Option{ProcessIn(30 * time.Minute), Unique(1 * time.Hour)}, - want: "ProcessIn(30m0s), Unique(1h0m0s)", - }, - } - - for _, tc := range tests { - got := stringifyOptions(tc.opts) - if got != tc.want { - t.Errorf("got %v, want %v", got, tc.want) - } - } -}