mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 15:52:18 +08:00
parent
123d560a44
commit
551b0c7119
@ -256,6 +256,21 @@ func IsRedisCommandError(err error) bool {
|
|||||||
return As(err, &target)
|
return As(err, &target)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PanicError defines an error when occurred a panic error.
|
||||||
|
type PanicError struct {
|
||||||
|
ErrMsg string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *PanicError) Error() string {
|
||||||
|
return fmt.Sprintf("panic error cause by: %s", e.ErrMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsPanicError reports whether any error in err's chain is of type PanicError.
|
||||||
|
func IsPanicError(err error) bool {
|
||||||
|
var target *PanicError
|
||||||
|
return As(err, &target)
|
||||||
|
}
|
||||||
|
|
||||||
/*************************************************
|
/*************************************************
|
||||||
Standard Library errors package functions
|
Standard Library errors package functions
|
||||||
*************************************************/
|
*************************************************/
|
||||||
|
@ -131,6 +131,12 @@ func TestErrorPredicates(t *testing.T) {
|
|||||||
err: E(Op("rdb.ArchiveTask"), NotFound, &QueueNotFoundError{Queue: "default"}),
|
err: E(Op("rdb.ArchiveTask"), NotFound, &QueueNotFoundError{Queue: "default"}),
|
||||||
want: true,
|
want: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "IsPanicError should detect presence of PanicError in err's chain",
|
||||||
|
fn: IsPanicError,
|
||||||
|
err: E(Op("unknown"), Unknown, &PanicError{ErrMsg: "Something went wrong"}),
|
||||||
|
want: true,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
12
processor.go
12
processor.go
@ -41,7 +41,6 @@ type processor struct {
|
|||||||
isFailureFunc func(error) bool
|
isFailureFunc func(error) bool
|
||||||
|
|
||||||
errHandler ErrorHandler
|
errHandler ErrorHandler
|
||||||
|
|
||||||
shutdownTimeout time.Duration
|
shutdownTimeout time.Duration
|
||||||
|
|
||||||
// channel via which to send sync requests to syncer.
|
// channel via which to send sync requests to syncer.
|
||||||
@ -413,7 +412,9 @@ func (p *processor) queues() []string {
|
|||||||
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if x := recover(); x != nil {
|
if x := recover(); x != nil {
|
||||||
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", string(debug.Stack()))
|
errMsg := string(debug.Stack())
|
||||||
|
|
||||||
|
p.logger.Errorf("recovering from panic. See the stack trace below for details:\n%s", errMsg)
|
||||||
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
_, file, line, ok := runtime.Caller(1) // skip the first frame (panic itself)
|
||||||
if ok && strings.Contains(file, "runtime/") {
|
if ok && strings.Contains(file, "runtime/") {
|
||||||
// The panic came from the runtime, most likely due to incorrect
|
// The panic came from the runtime, most likely due to incorrect
|
||||||
@ -427,6 +428,9 @@ func (p *processor) perform(ctx context.Context, task *Task) (err error) {
|
|||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("panic: %v", x)
|
err = fmt.Errorf("panic: %v", x)
|
||||||
}
|
}
|
||||||
|
err = &errors.PanicError{
|
||||||
|
ErrMsg: errMsg,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return p.handler.ProcessTask(ctx, task)
|
return p.handler.ProcessTask(ctx, task)
|
||||||
@ -521,3 +525,7 @@ func (p *processor) computeDeadline(msg *base.TaskMessage) time.Time {
|
|||||||
}
|
}
|
||||||
return time.Unix(msg.Deadline, 0)
|
return time.Unix(msg.Deadline, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func IsPanicError(err error) bool {
|
||||||
|
return errors.IsPanicError(err)
|
||||||
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -921,3 +922,45 @@ func TestProcessorComputeDeadline(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReturnPanicError(t *testing.T) {
|
||||||
|
|
||||||
|
task := NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}))
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
handler HandlerFunc
|
||||||
|
IsPanicError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "should return panic error when occurred panic recovery",
|
||||||
|
handler: func(ctx context.Context, t *Task) error {
|
||||||
|
panic("something went terribly wrong")
|
||||||
|
},
|
||||||
|
IsPanicError: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "should return normal error when don't occur panic recovery",
|
||||||
|
handler: func(ctx context.Context, t *Task) error {
|
||||||
|
return fmt.Errorf("something went terribly wrong")
|
||||||
|
},
|
||||||
|
IsPanicError: false,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
p := processor{
|
||||||
|
logger: log.NewLogger(nil),
|
||||||
|
handler: tc.handler,
|
||||||
|
}
|
||||||
|
got := p.perform(context.Background(), task)
|
||||||
|
if tc.IsPanicError != IsPanicError(got) {
|
||||||
|
t.Errorf("%s: got=%t, want=%t", tc.name, IsPanicError(got), tc.IsPanicError)
|
||||||
|
}
|
||||||
|
if tc.IsPanicError && !strings.HasPrefix(got.Error(), "panic error cause by:") {
|
||||||
|
t.Error("wrong text msg for panic error")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
10
server.go
10
server.go
@ -162,6 +162,16 @@ type Config struct {
|
|||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||||
|
|
||||||
|
// we can also handle panic error like:
|
||||||
|
// func reportError(ctx context, task *asynq.Task, err error) {
|
||||||
|
// if asynq.IsPanic(err) {
|
||||||
|
// errorReportingService.Notify(err)
|
||||||
|
// }
|
||||||
|
// })
|
||||||
|
//
|
||||||
|
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||||
|
|
||||||
ErrorHandler ErrorHandler
|
ErrorHandler ErrorHandler
|
||||||
|
|
||||||
// Logger specifies the logger used by the server instance.
|
// Logger specifies the logger used by the server instance.
|
||||||
|
Loading…
Reference in New Issue
Block a user