diff --git a/client.go b/client.go index 28a53e0..bc53876 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ package asynq import ( "errors" "fmt" + "strconv" "strings" "sync" "time" @@ -85,7 +86,7 @@ func MaxRetry(n int) Option { func (n retryOption) String() string { return fmt.Sprintf("MaxRetry(%d)", int(n)) } func (n retryOption) Type() OptionType { return MaxRetryOpt } -func (n retryOption) Value() interface{} { return n } +func (n retryOption) Value() interface{} { return int(n) } // Queue returns an option to specify the queue to enqueue the task into. // @@ -96,7 +97,7 @@ func Queue(qname string) Option { func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) } func (qname queueOption) Type() OptionType { return QueueOpt } -func (qname queueOption) Value() interface{} { return qname } +func (qname queueOption) Value() interface{} { return string(qname) } // Timeout returns an option to specify how long a task may run. // If the timeout elapses before the Handler returns, then the task @@ -112,7 +113,7 @@ func Timeout(d time.Duration) Option { func (d timeoutOption) String() string { return fmt.Sprintf("Timeout(%v)", time.Duration(d)) } func (d timeoutOption) Type() OptionType { return TimeoutOpt } -func (d timeoutOption) Value() interface{} { return d } +func (d timeoutOption) Value() interface{} { return time.Duration(d) } // Deadline returns an option to specify the deadline for the given task. // If it reaches the deadline before the Handler returns, then the task @@ -124,9 +125,11 @@ func Deadline(t time.Time) Option { return deadlineOption(t) } -func (t deadlineOption) String() string { return fmt.Sprintf("Deadline(%v)", time.Time(t)) } +func (t deadlineOption) String() string { + return fmt.Sprintf("Deadline(%v)", time.Time(t).Format(time.UnixDate)) +} func (t deadlineOption) Type() OptionType { return DeadlineOpt } -func (t deadlineOption) Value() interface{} { return t } +func (t deadlineOption) Value() interface{} { return time.Time(t) } // Unique returns an option to enqueue a task only if the given task is unique. // Task enqueued with this option is guaranteed to be unique within the given ttl. @@ -143,7 +146,7 @@ func Unique(ttl time.Duration) Option { func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", time.Duration(ttl)) } func (ttl uniqueOption) Type() OptionType { return UniqueOpt } -func (ttl uniqueOption) Value() interface{} { return ttl } +func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) } // ProcessAt returns an option to specify when to process the given task. // @@ -152,9 +155,11 @@ func ProcessAt(t time.Time) Option { return processAtOption(t) } -func (t processAtOption) String() string { return fmt.Sprintf("ProcessAt(%v)", time.Time(t)) } +func (t processAtOption) String() string { + return fmt.Sprintf("ProcessAt(%v)", time.Time(t).Format(time.UnixDate)) +} func (t processAtOption) Type() OptionType { return ProcessAtOpt } -func (t processAtOption) Value() interface{} { return t } +func (t processAtOption) Value() interface{} { return time.Time(t) } // ProcessIn returns an option to specify when to process the given task relative to the current time. // @@ -165,7 +170,75 @@ func ProcessIn(d time.Duration) Option { func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)", time.Duration(d)) } func (d processInOption) Type() OptionType { return ProcessInOpt } -func (d processInOption) Value() interface{} { return d } +func (d processInOption) Value() interface{} { return time.Duration(d) } + +// parseOption interprets a string s as an Option and returns the Option if parsing is successful, +// otherwise returns non-nil error. +func parseOption(s string) (Option, error) { + fn, arg := parseOptionFunc(s), parseOptionArg(s) + switch fn { + case "Queue": + qname, err := strconv.Unquote(arg) + if err != nil { + return nil, err + } + return Queue(qname), nil + case "MaxRetry": + n, err := strconv.Atoi(arg) + if err != nil { + return nil, err + } + return MaxRetry(n), nil + case "Timeout": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return Timeout(d), nil + case "Deadline": + t, err := time.Parse(time.UnixDate, arg) + if err != nil { + return nil, err + } + return Deadline(t), nil + case "Unique": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return Unique(d), nil + case "ProcessAt": + t, err := time.Parse(time.UnixDate, arg) + if err != nil { + return nil, err + } + return ProcessAt(t), nil + case "ProcessIn": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return ProcessIn(d), nil + default: + return nil, fmt.Errorf("cannot not parse option string %q", s) + } +} + +func parseOptionFunc(s string) string { + i := strings.Index(s, "(") + return s[:i] +} + +func parseOptionArg(s string) string { + i := strings.Index(s, "(") + if i >= 0 { + j := strings.Index(s, ")") + if j > i { + return s[i+1 : j] + } + } + return "" +} // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // diff --git a/client_test.go b/client_test.go index fa0dd3d..de59b99 100644 --- a/client_test.go +++ b/client_test.go @@ -774,3 +774,71 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } } + +func TestParseOption(t *testing.T) { + oneHourFromNow := time.Now().Add(1 * time.Hour) + tests := []struct { + s string + wantType OptionType + wantVal interface{} + }{ + {`MaxRetry(10)`, MaxRetryOpt, 10}, + {`Queue("email")`, QueueOpt, "email"}, + {`Timeout(3m)`, TimeoutOpt, 3 * time.Minute}, + {Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow}, + {`Unique(1h)`, UniqueOpt, 1 * time.Hour}, + {ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow}, + {`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute}, + } + + for _, tc := range tests { + t.Run(tc.s, func(t *testing.T) { + got, err := parseOption(tc.s) + if err != nil { + t.Fatalf("returned error: %v", err) + } + if got == nil { + t.Fatal("returned nil") + } + if got.Type() != tc.wantType { + t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType) + } + switch tc.wantType { + case QueueOpt: + gotVal, ok := got.Value().(string) + if !ok { + t.Fatal("returned Option with non-string value") + } + if gotVal != tc.wantVal.(string) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case MaxRetryOpt: + gotVal, ok := got.Value().(int) + if !ok { + t.Fatal("returned Option with non-int value") + } + if gotVal != tc.wantVal.(int) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case TimeoutOpt, UniqueOpt, ProcessInOpt: + gotVal, ok := got.Value().(time.Duration) + if !ok { + t.Fatal("returned Option with non duration value") + } + if gotVal != tc.wantVal.(time.Duration) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case DeadlineOpt, ProcessAtOpt: + gotVal, ok := got.Value().(time.Time) + if !ok { + t.Fatal("returned Option with non time value") + } + if cmp.Equal(gotVal, tc.wantVal.(time.Time)) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + default: + t.Fatalf("returned Option with unexpected type: %v", got.Type()) + } + }) + } +}