2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-18 23:00:20 +08:00

Merge 1e13014380a251e90447d500f29798e90af3f083 into 489e21920b92ae6acfc19c54de91166e56817620

This commit is contained in:
鸿则 2024-12-28 20:37:34 +08:00 committed by GitHub
commit 27c1866084
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 3 deletions

View File

@ -44,6 +44,8 @@ type processor struct {
errHandler ErrorHandler
shutdownTimeout time.Duration
finishedHandler FinishedHandler
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest
@ -85,6 +87,7 @@ type processorParams struct {
queues map[string]int
strictPriority bool
errHandler ErrorHandler
finishedHandler FinishedHandler
shutdownTimeout time.Duration
starting chan<- *workerInfo
finished chan<- *base.TaskMessage
@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor {
quit: make(chan struct{}),
abort: make(chan struct{}),
errHandler: params.errHandler,
finishedHandler: params.finishedHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout,
starting: params.starting,
@ -294,12 +298,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.MarkAsComplete(ctx, msg)
if err := p.broker.MarkAsComplete(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}
func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
@ -315,12 +328,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Done(ctx, msg)
if err := p.broker.Done(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
@ -383,12 +405,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.broker.Archive(ctx, msg, e.Error())
if err := p.broker.Archive(ctx, msg, e.Error()); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
},
errMsg: errMsg,
deadline: l.Deadline(),
}
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
}
// queues returns a list of queues to query.

View File

@ -186,6 +186,11 @@ type Config struct {
ErrorHandler ErrorHandler
// FinishedHandler handles a task that has been processed.
//
// FinishedHandler is called when the task status becomes completed or archived.
FinishedHandler FinishedHandler
// Logger specifies the logger used by the server instance.
//
// If unset, default logger is used.
@ -289,6 +294,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err)
}
// An FinishedHandler handles a task that has been processed.
type FinishedHandler interface {
HandleFinished(task *TaskInfo)
}
// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler.
// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f.
type FinishedHandlerFunc func(task *TaskInfo)
// HandleFinished calls fn(ctx, task, err)
func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) {
fn(task)
}
// RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task.
//
@ -555,6 +574,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
queues: queues,
strictPriority: cfg.StrictPriority,
errHandler: cfg.ErrorHandler,
finishedHandler: cfg.FinishedHandler,
shutdownTimeout: shutdownTimeout,
starting: starting,
finished: finished,