From 39459b4412f72e5776b2047f1d61325b7e23d2e6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 11 Feb 2020 21:53:59 -0800 Subject: [PATCH] Allow user to specify timeout per task --- client.go | 25 +++++++++++++++++++------ client_test.go | 27 +++++++++++++++++++++++++++ internal/base/base.go | 6 ++++++ processor.go | 16 ++++++++++++++-- 4 files changed, 66 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index afd1f9e..b7116e4 100644 --- a/client.go +++ b/client.go @@ -34,8 +34,9 @@ type Option interface{} // Internal option representations. type ( - retryOption int - queueOption string + retryOption int + queueOption string + timeoutOption time.Duration ) // MaxRetry returns an option to specify the max number of times @@ -56,15 +57,24 @@ func Queue(name string) Option { return queueOption(strings.ToLower(name)) } +// Timeout returns an option to specify how long a task may run. +// +// Zero duration means no limit. +func Timeout(d time.Duration) Option { + return timeoutOption(d) +} + type option struct { - retry int - queue string + retry int + queue string + timeout time.Duration } func composeOptions(opts ...Option) option { res := option{ - retry: defaultMaxRetry, - queue: base.DefaultQueueName, + retry: defaultMaxRetry, + queue: base.DefaultQueueName, + timeout: 0, } for _, opt := range opts { switch opt := opt.(type) { @@ -72,6 +82,8 @@ func composeOptions(opts ...Option) option { res.retry = int(opt) case queueOption: res.queue = string(opt) + case timeoutOption: + res.timeout = time.Duration(opt) default: // ignore unexpected option } @@ -99,6 +111,7 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error Payload: task.Payload.data, Queue: opt.queue, Retry: opt.retry, + Timeout: opt.timeout.String(), } return c.enqueue(msg, processAt) } diff --git a/client_test.go b/client_test.go index 7024f5d..1bd19ef 100644 --- a/client_test.go +++ b/client_test.go @@ -42,6 +42,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", + Timeout: time.Duration(0).String(), }, }, }, @@ -60,6 +61,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", + Timeout: time.Duration(0).String(), }, Score: float64(time.Now().Add(2 * time.Hour).Unix()), }, @@ -79,6 +81,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: 3, Queue: "default", + Timeout: time.Duration(0).String(), }, }, }, @@ -98,6 +101,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: 0, // Retry count should be set to zero Queue: "default", + Timeout: time.Duration(0).String(), }, }, }, @@ -118,6 +122,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: 10, // Last option takes precedence Queue: "default", + Timeout: time.Duration(0).String(), }, }, }, @@ -137,6 +142,7 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "custom", + Timeout: time.Duration(0).String(), }, }, }, @@ -156,6 +162,27 @@ func TestClient(t *testing.T) { Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "high", + Timeout: time.Duration(0).String(), + }, + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "Timeout option sets the timeout duration", + task: task, + processAt: time.Now(), + opts: []Option{ + Timeout(20 * time.Second), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + Timeout: (20 * time.Second).String(), }, }, }, diff --git a/internal/base/base.go b/internal/base/base.go index f6956da..71932f7 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -77,6 +77,12 @@ type TaskMessage struct { // ErrorMsg holds the error message from the last failure. ErrorMsg string + + // Timeout specifies how long a task may run. + // The string value should be compatible with time.Duration.ParseDuration. + // + // Zero means no limit. + Timeout string } // ProcessInfo holds information about running background worker process. diff --git a/processor.go b/processor.go index 4235d85..656c3d6 100644 --- a/processor.go +++ b/processor.go @@ -173,8 +173,7 @@ func (p *processor) exec() { resCh := make(chan error, 1) task := NewTask(msg.Type, msg.Payload) - // TODO: Set timeout if provided - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := createContext(msg) p.addCancelFunc(msg.ID, cancel) go func() { resCh <- perform(ctx, task, p.handler) @@ -394,3 +393,16 @@ func gcd(xs ...uint) uint { } return res } + +// createContext returns a context and cancel function for a given task message. +func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) { + timeout, err := time.ParseDuration(msg.Timeout) + if err != nil { + logger.error("cannot parse timeout duration for %+v", msg) + return context.WithCancel(context.Background()) + } + if timeout == 0 { + return context.WithCancel(context.Background()) + } + return context.WithTimeout(context.Background(), timeout) +}