mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-19 23:30:20 +08:00
feat: add finished handler to server
This commit is contained in:
parent
8df0bfa583
commit
1e13014380
37
processor.go
37
processor.go
@ -44,6 +44,8 @@ type processor struct {
|
||||
errHandler ErrorHandler
|
||||
shutdownTimeout time.Duration
|
||||
|
||||
finishedHandler FinishedHandler
|
||||
|
||||
// channel via which to send sync requests to syncer.
|
||||
syncRequestCh chan<- *syncRequest
|
||||
|
||||
@ -85,6 +87,7 @@ type processorParams struct {
|
||||
queues map[string]int
|
||||
strictPriority bool
|
||||
errHandler ErrorHandler
|
||||
finishedHandler FinishedHandler
|
||||
shutdownTimeout time.Duration
|
||||
starting chan<- *workerInfo
|
||||
finished chan<- *base.TaskMessage
|
||||
@ -115,6 +118,7 @@ func newProcessor(params processorParams) *processor {
|
||||
quit: make(chan struct{}),
|
||||
abort: make(chan struct{}),
|
||||
errHandler: params.errHandler,
|
||||
finishedHandler: params.finishedHandler,
|
||||
handler: HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }),
|
||||
shutdownTimeout: params.shutdownTimeout,
|
||||
starting: params.starting,
|
||||
@ -291,12 +295,21 @@ func (p *processor) markAsComplete(l *base.Lease, msg *base.TaskMessage) {
|
||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||
p.syncRequestCh <- &syncRequest{
|
||||
fn: func() error {
|
||||
return p.broker.MarkAsComplete(ctx, msg)
|
||||
if err := p.broker.MarkAsComplete(ctx, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
errMsg: errMsg,
|
||||
deadline: l.Deadline(),
|
||||
}
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
|
||||
@ -311,12 +324,21 @@ func (p *processor) markAsDone(l *base.Lease, msg *base.TaskMessage) {
|
||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||
p.syncRequestCh <- &syncRequest{
|
||||
fn: func() error {
|
||||
return p.broker.Done(ctx, msg)
|
||||
if err := p.broker.Done(ctx, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
errMsg: errMsg,
|
||||
deadline: l.Deadline(),
|
||||
}
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
}
|
||||
|
||||
// SkipRetry is used as a return value from Handler.ProcessTask to indicate that
|
||||
@ -374,12 +396,21 @@ func (p *processor) archive(l *base.Lease, msg *base.TaskMessage, e error) {
|
||||
p.logger.Warnf("%s; Will retry syncing", errMsg)
|
||||
p.syncRequestCh <- &syncRequest{
|
||||
fn: func() error {
|
||||
return p.broker.Archive(ctx, msg, e.Error())
|
||||
if err := p.broker.Archive(ctx, msg, e.Error()); err != nil {
|
||||
return err
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
errMsg: errMsg,
|
||||
deadline: l.Deadline(),
|
||||
}
|
||||
}
|
||||
if p.finishedHandler != nil {
|
||||
p.finishedHandler.HandleFinished(newTaskInfo(msg, base.TaskStateArchived, time.Time{}, nil))
|
||||
}
|
||||
}
|
||||
|
||||
// queues returns a list of queues to query.
|
||||
|
22
server.go
22
server.go
@ -103,7 +103,7 @@ type Config struct {
|
||||
// If BaseContext is nil, the default is context.Background().
|
||||
// If this is defined, then it MUST return a non-nil context
|
||||
BaseContext func() context.Context
|
||||
|
||||
|
||||
// TaskCheckInterval specifies the interval between checks for new tasks to process when all queues are empty.
|
||||
//
|
||||
// If unset, zero or a negative value, the interval is set to 1 second.
|
||||
@ -183,6 +183,11 @@ type Config struct {
|
||||
|
||||
ErrorHandler ErrorHandler
|
||||
|
||||
// FinishedHandler handles a task that has been processed.
|
||||
//
|
||||
// FinishedHandler is called when the task status becomes completed or archived.
|
||||
FinishedHandler FinishedHandler
|
||||
|
||||
// Logger specifies the logger used by the server instance.
|
||||
//
|
||||
// If unset, default logger is used.
|
||||
@ -275,6 +280,20 @@ func (fn ErrorHandlerFunc) HandleError(ctx context.Context, task *Task, err erro
|
||||
fn(ctx, task, err)
|
||||
}
|
||||
|
||||
// An FinishedHandler handles a task that has been processed.
|
||||
type FinishedHandler interface {
|
||||
HandleFinished(task *TaskInfo)
|
||||
}
|
||||
|
||||
// The FinishedHandlerFunc type is an adapter to allow the use of ordinary functions as a FinishedHandler.
|
||||
// If f is a function with the appropriate signature, FinishedHandlerFunc(f) is a FinishedHandler that calls f.
|
||||
type FinishedHandlerFunc func(task *TaskInfo)
|
||||
|
||||
// HandleFinished calls fn(ctx, task, err)
|
||||
func (fn FinishedHandlerFunc) HandleFinished(task *TaskInfo) {
|
||||
fn(task)
|
||||
}
|
||||
|
||||
// RetryDelayFunc calculates the retry delay duration for a failed task given
|
||||
// the retry count, error, and the task.
|
||||
//
|
||||
@ -529,6 +548,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
||||
queues: queues,
|
||||
strictPriority: cfg.StrictPriority,
|
||||
errHandler: cfg.ErrorHandler,
|
||||
finishedHandler: cfg.FinishedHandler,
|
||||
shutdownTimeout: shutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
|
Loading…
x
Reference in New Issue
Block a user