2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update Option interface

- Added `String()`, `Type()`, and `Value()` methods to the interface to
  aid with debugging and error handling.
This commit is contained in:
Ken Hibino 2020-10-10 06:46:47 -07:00
parent 50e7f38365
commit 8312515e64
5 changed files with 61 additions and 60 deletions

View File

@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Changed
- interface `Option` has changed. See the godoc for the new interface.
This change would have no impact as long as you are using exported functions (e.g. `MaxRetry`, `Queue`, etc)
to create `Option`s.
### Added
- `Payload.String() string` method is added

View File

@ -37,8 +37,29 @@ func NewClient(r RedisConnOpt) *Client {
}
}
type OptionType int
const (
MaxRetryOpt OptionType = iota
QueueOpt
TimeoutOpt
DeadlineOpt
UniqueOpt
ProcessAtOpt
ProcessInOpt
)
// Option specifies the task processing behavior.
type Option interface{}
type Option interface {
// String returns a string representation of the option.
String() string
// Type describes the type of the option.
Type() OptionType
// Value returns a value used to create this option.
Value() interface{}
}
// Internal option representations.
type (
@ -62,13 +83,21 @@ func MaxRetry(n int) Option {
return retryOption(n)
}
func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) }
func (n retryOption) Type() OptionType { return MaxRetryOpt }
func (n retryOption) Value() interface{} { return n }
// Queue returns an option to specify the queue to enqueue the task into.
//
// Queue name is case-insensitive and the lowercased version is used.
func Queue(name string) Option {
return queueOption(strings.ToLower(name))
func Queue(qname string) Option {
return queueOption(strings.ToLower(qname))
}
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
func (qname queueOption) Type() OptionType { return QueueOpt }
func (qname queueOption) Value() interface{} { return qname }
// Timeout returns an option to specify how long a task may run.
// If the timeout elapses before the Handler returns, then the task
// will be retried.
@ -81,6 +110,10 @@ func Timeout(d time.Duration) Option {
return timeoutOption(d)
}
func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) }
func (d timeoutOption) Type() OptionType { return TimeoutOpt }
func (d timeoutOption) Value() interface{} { return d }
// Deadline returns an option to specify the deadline for the given task.
// If it reaches the deadline before the Handler returns, then the task
// will be retried.
@ -91,6 +124,10 @@ func Deadline(t time.Time) Option {
return deadlineOption(t)
}
func (t deadlineOption) String() string { return fmt.Sprintf("Deadline(%v)", time.Time(t)) }
func (t deadlineOption) Type() OptionType { return DeadlineOpt }
func (t deadlineOption) Value() interface{} { return t }
// Unique returns an option to enqueue a task only if the given task is unique.
// Task enqueued with this option is guaranteed to be unique within the given ttl.
// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
@ -104,6 +141,10 @@ func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) }
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
func (ttl uniqueOption) Value() interface{} { return ttl }
// ProcessAt returns an option to specify when to process the given task.
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
@ -111,6 +152,10 @@ func ProcessAt(t time.Time) Option {
return processAtOption(t)
}
func (t processAtOption) String() string { return fmt.Sprintf("ProcessAt(%v)", time.Time(t)) }
func (t processAtOption) Type() OptionType { return ProcessAtOpt }
func (t processAtOption) Value() interface{} { return t }
// ProcessIn returns an option to specify when to process the given task relative to the current time.
//
// If there's a conflicting ProcessAt option, the last option passed to Enqueue overrides the others.
@ -118,6 +163,10 @@ func ProcessIn(d time.Duration) Option {
return processInOption(d)
}
func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) }
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return d }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.

View File

@ -299,7 +299,7 @@ type SchedulerEntry struct {
Payload map[string]interface{}
// Opts is the options for the periodic task.
Opts string
Opts []string
// Next shows the next time the task will be enqueued.
Next time.Time

View File

@ -7,7 +7,6 @@ package asynq
import (
"fmt"
"os"
"strings"
"sync"
"time"
@ -222,27 +221,10 @@ func (s *Scheduler) beat() {
}
}
func stringifyOptions(opts []Option) string {
func stringifyOptions(opts []Option) []string {
var res []string
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res = append(res, fmt.Sprintf("MaxRetry(%d)", int(opt)))
case queueOption:
res = append(res, fmt.Sprintf("Queue(%q)", string(opt)))
case timeoutOption:
res = append(res, fmt.Sprintf("Timeout(%v)", time.Duration(opt)))
case deadlineOption:
res = append(res, fmt.Sprintf("Deadline(%v)", time.Time(opt)))
case uniqueOption:
res = append(res, fmt.Sprintf("Unique(%v)", time.Duration(opt)))
case processAtOption:
res = append(res, fmt.Sprintf("ProcessAt(%v)", time.Time(opt)))
case processInOption:
res = append(res, fmt.Sprintf("ProcessIn(%v)", time.Duration(opt)))
default:
// ignore unexpected option
}
res = append(res, opt.String())
}
return strings.Join(res, ", ")
return res
}

View File

@ -5,7 +5,6 @@
package asynq
import (
"fmt"
"testing"
"time"
@ -77,37 +76,3 @@ func TestScheduler(t *testing.T) {
}
}
}
func TestStringifyOptions(t *testing.T) {
now := time.Now()
oneHourFromNow := now.Add(1 * time.Hour)
twoHoursFromNow := now.Add(2 * time.Hour)
tests := []struct {
opts []Option
want string
}{
{
opts: []Option{MaxRetry(10)},
want: "MaxRetry(10)",
},
{
opts: []Option{Queue("custom"), Timeout(1 * time.Minute)},
want: `Queue("custom"), Timeout(1m0s)`,
},
{
opts: []Option{ProcessAt(oneHourFromNow), Deadline(twoHoursFromNow)},
want: fmt.Sprintf("ProcessAt(%v), Deadline(%v)", oneHourFromNow, twoHoursFromNow),
},
{
opts: []Option{ProcessIn(30 * time.Minute), Unique(1 * time.Hour)},
want: "ProcessIn(30m0s), Unique(1h0m0s)",
},
}
for _, tc := range tests {
got := stringifyOptions(tc.opts)
if got != tc.want {
t.Errorf("got %v, want %v", got, tc.want)
}
}
}