mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-26 11:16:12 +08:00
Allow user to specify timeout per task
This commit is contained in:
16
processor.go
16
processor.go
@@ -173,8 +173,7 @@ func (p *processor) exec() {
|
||||
|
||||
resCh := make(chan error, 1)
|
||||
task := NewTask(msg.Type, msg.Payload)
|
||||
// TODO: Set timeout if provided
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx, cancel := createContext(msg)
|
||||
p.addCancelFunc(msg.ID, cancel)
|
||||
go func() {
|
||||
resCh <- perform(ctx, task, p.handler)
|
||||
@@ -394,3 +393,16 @@ func gcd(xs ...uint) uint {
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// createContext returns a context and cancel function for a given task message.
|
||||
func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) {
|
||||
timeout, err := time.ParseDuration(msg.Timeout)
|
||||
if err != nil {
|
||||
logger.error("cannot parse timeout duration for %+v", msg)
|
||||
return context.WithCancel(context.Background())
|
||||
}
|
||||
if timeout == 0 {
|
||||
return context.WithCancel(context.Background())
|
||||
}
|
||||
return context.WithTimeout(context.Background(), timeout)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user