2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Allow user to specify timeout per task

This commit is contained in:
Ken Hibino 2020-02-11 21:53:59 -08:00
parent 6e14062325
commit 39459b4412
4 changed files with 66 additions and 8 deletions

View File

@ -36,6 +36,7 @@ type Option interface{}
type ( type (
retryOption int retryOption int
queueOption string queueOption string
timeoutOption time.Duration
) )
// MaxRetry returns an option to specify the max number of times // MaxRetry returns an option to specify the max number of times
@ -56,15 +57,24 @@ func Queue(name string) Option {
return queueOption(strings.ToLower(name)) return queueOption(strings.ToLower(name))
} }
// Timeout returns an option to specify how long a task may run.
//
// Zero duration means no limit.
func Timeout(d time.Duration) Option {
return timeoutOption(d)
}
type option struct { type option struct {
retry int retry int
queue string queue string
timeout time.Duration
} }
func composeOptions(opts ...Option) option { func composeOptions(opts ...Option) option {
res := option{ res := option{
retry: defaultMaxRetry, retry: defaultMaxRetry,
queue: base.DefaultQueueName, queue: base.DefaultQueueName,
timeout: 0,
} }
for _, opt := range opts { for _, opt := range opts {
switch opt := opt.(type) { switch opt := opt.(type) {
@ -72,6 +82,8 @@ func composeOptions(opts ...Option) option {
res.retry = int(opt) res.retry = int(opt)
case queueOption: case queueOption:
res.queue = string(opt) res.queue = string(opt)
case timeoutOption:
res.timeout = time.Duration(opt)
default: default:
// ignore unexpected option // ignore unexpected option
} }
@ -99,6 +111,7 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error
Payload: task.Payload.data, Payload: task.Payload.data,
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Timeout: opt.timeout.String(),
} }
return c.enqueue(msg, processAt) return c.enqueue(msg, processAt)
} }

View File

@ -42,6 +42,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(),
}, },
}, },
}, },
@ -60,6 +61,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(),
}, },
Score: float64(time.Now().Add(2 * time.Hour).Unix()), Score: float64(time.Now().Add(2 * time.Hour).Unix()),
}, },
@ -79,6 +81,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 3, Retry: 3,
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(),
}, },
}, },
}, },
@ -98,6 +101,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero Retry: 0, // Retry count should be set to zero
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(),
}, },
}, },
}, },
@ -118,6 +122,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: 10, // Last option takes precedence Retry: 10, // Last option takes precedence
Queue: "default", Queue: "default",
Timeout: time.Duration(0).String(),
}, },
}, },
}, },
@ -137,6 +142,7 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "custom", Queue: "custom",
Timeout: time.Duration(0).String(),
}, },
}, },
}, },
@ -156,6 +162,27 @@ func TestClient(t *testing.T) {
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "high", Queue: "high",
Timeout: time.Duration(0).String(),
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Timeout option sets the timeout duration",
task: task,
processAt: time.Now(),
opts: []Option{
Timeout(20 * time.Second),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
}, },
}, },
}, },

View File

@ -77,6 +77,12 @@ type TaskMessage struct {
// ErrorMsg holds the error message from the last failure. // ErrorMsg holds the error message from the last failure.
ErrorMsg string ErrorMsg string
// Timeout specifies how long a task may run.
// The string value should be compatible with time.Duration.ParseDuration.
//
// Zero means no limit.
Timeout string
} }
// ProcessInfo holds information about running background worker process. // ProcessInfo holds information about running background worker process.

View File

@ -173,8 +173,7 @@ func (p *processor) exec() {
resCh := make(chan error, 1) resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload) task := NewTask(msg.Type, msg.Payload)
// TODO: Set timeout if provided ctx, cancel := createContext(msg)
ctx, cancel := context.WithCancel(context.Background())
p.addCancelFunc(msg.ID, cancel) p.addCancelFunc(msg.ID, cancel)
go func() { go func() {
resCh <- perform(ctx, task, p.handler) resCh <- perform(ctx, task, p.handler)
@ -394,3 +393,16 @@ func gcd(xs ...uint) uint {
} }
return res return res
} }
// createContext returns a context and cancel function for a given task message.
func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) {
timeout, err := time.ParseDuration(msg.Timeout)
if err != nil {
logger.error("cannot parse timeout duration for %+v", msg)
return context.WithCancel(context.Background())
}
if timeout == 0 {
return context.WithCancel(context.Background())
}
return context.WithTimeout(context.Background(), timeout)
}