2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-20 07:40:19 +08:00

Merge 1e13014380a251e90447d500f29798e90af3f083 into c327bc40a28e4db45195cfe082d88faa808ce87d

This commit is contained in:
鸿则 2025-04-13 16:33:29 +00:00 committed by GitHub
commit f35cf796d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 54 additions and 3 deletions

View File

@ -44,6 +44,8 @@ type processor struct {
errHandler ErrorHandler errHandler ErrorHandler
shutdownTimeout time.Duration shutdownTimeout time.Duration
finishedHandler FinishedHandler
// channel via which to send sync requests to syncer. // channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest syncRequestCh chan<- *syncRequest
@ -85,6 +87,7 @@ type processorParams struct {
queues map[string]int queues map[string]int
strictPriority bool strictPriority bool
errHandler ErrorHandler errHandler ErrorHandler
finishedHandler FinishedHandler
shutdownTimeout time.Duration shutdownTimeout time.Duration
starting chan<- *workerInfo starting chan<- *workerInfo
finished chan<- *base.TaskMessage finished chan<- *base.TaskMessage
@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor {
quit: make(chan struct{}), quit: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),
errHandler: params.errHandler, errHandler: params.errHandler,
finishedHandler: params.finishedHandler,
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
shutdownTimeout: params.shutdownTimeout, shutdownTimeout: params.shutdownTimeout,
starting: params.starting, starting: params.starting,
@ -294,12 +298,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.MarkAsComplete(ctx, msg) if err := p.broker.MarkAsComplete(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: l.Deadline(), deadline: l.Deadline(),
} }
} }
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
} }
func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) { func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
@ -315,12 +328,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Done(ctx, msg) if err := p.broker.Done(ctx, msg); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: l.Deadline(), deadline: l.Deadline(),
} }
} }
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
} }
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that // SkipRetry is used as a return value from Handler.ProcessTask to indicate that
@ -383,12 +405,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.broker.Archive(ctx, msg, e.Error()) if err := p.broker.Archive(ctx, msg, e.Error()); err != nil {
return err
}
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
return nil
}, },
errMsg: errMsg, errMsg: errMsg,
deadline: l.Deadline(), deadline: l.Deadline(),
} }
} }
if p.finishedHandler != nil {
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
}
} }
// queues returns a list of queues to query. // queues returns a list of queues to query.

View File

@ -185,6 +185,11 @@ type Config struct {
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
// FinishedHandler handles a task that has been processed.
//
// FinishedHandler is called when the task status becomes completed or archived.
FinishedHandler FinishedHandler
// Logger specifies the logger used by the server instance. // Logger specifies the logger used by the server instance.
// //
// If unset, default logger is used. // If unset, default logger is used.
@ -288,6 +293,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
fn(ctx, task, err) fn(ctx, task, err)
} }
// An FinishedHandler handles a task that has been processed.
type FinishedHandler interface {
HandleFinished(task *TaskInfo)
}
// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler.
// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f.
type FinishedHandlerFunc func(task *TaskInfo)
// HandleFinished calls fn(ctx, task, err)
func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) {
fn(task)
}
// RetryDelayFunc calculates the retry delay duration for a failed task given // RetryDelayFunc calculates the retry delay duration for a failed task given
// the retry count, error, and the task. // the retry count, error, and the task.
// //
@ -554,6 +573,7 @@ func NewServerFromRedisClient(c redis.UniversalClient, cfg Config) *Server {
queues: queues, queues: queues,
strictPriority: cfg.StrictPriority, strictPriority: cfg.StrictPriority,
errHandler: cfg.ErrorHandler, errHandler: cfg.ErrorHandler,
finishedHandler: cfg.FinishedHandler,
shutdownTimeout: shutdownTimeout, shutdownTimeout: shutdownTimeout,
starting: starting, starting: starting,
finished: finished, finished: finished,