mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-14 19:38:49 +08:00
[RFC] Adds Ping() to client/scheduler/server (#585)
* [RFC] Adds Ping() to client/scheduler/server * Checks for scheduler state closed
This commit is contained in:
parent
0dc670d7d8
commit
b1e13893ff
@ -419,6 +419,11 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
|||||||
return newTaskInfo(msg, state, opt.processAt, nil), nil
|
return newTaskInfo(msg, state, opt.processAt, nil), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping performs a ping against the redis connection.
|
||||||
|
func (c *Client) Ping() error {
|
||||||
|
return c.broker.Ping()
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL time.Duration) error {
|
||||||
if uniqueTTL > 0 {
|
if uniqueTTL > 0 {
|
||||||
return c.broker.EnqueueUnique(ctx, msg, uniqueTTL)
|
return c.broker.EnqueueUnique(ctx, msg, uniqueTTL)
|
||||||
|
11
scheduler.go
11
scheduler.go
@ -336,3 +336,14 @@ func (s *Scheduler) clearHistory() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping performs a ping against the redis connection.
|
||||||
|
func (s *Scheduler) Ping() error {
|
||||||
|
s.state.mu.Lock()
|
||||||
|
defer s.state.mu.Unlock()
|
||||||
|
if s.state.value == srvStateClosed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.rdb.Ping()
|
||||||
|
}
|
||||||
|
15
server.go
15
server.go
@ -759,7 +759,7 @@ func (srv *Server) Shutdown() {
|
|||||||
func (srv *Server) Stop() {
|
func (srv *Server) Stop() {
|
||||||
srv.state.mu.Lock()
|
srv.state.mu.Lock()
|
||||||
if srv.state.value != srvStateActive {
|
if srv.state.value != srvStateActive {
|
||||||
// Invalid calll to Stop, server can only go from Active state to Stopped state.
|
// Invalid call to Stop, server can only go from Active state to Stopped state.
|
||||||
srv.state.mu.Unlock()
|
srv.state.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -770,3 +770,16 @@ func (srv *Server) Stop() {
|
|||||||
srv.processor.stop()
|
srv.processor.stop()
|
||||||
srv.logger.Info("Processor stopped")
|
srv.logger.Info("Processor stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ping performs a ping against the redis connection.
|
||||||
|
//
|
||||||
|
// This is an alternative to the HealthCheckFunc available in the Config object.
|
||||||
|
func (srv *Server) Ping() error {
|
||||||
|
srv.state.mu.Lock()
|
||||||
|
defer srv.state.mu.Unlock()
|
||||||
|
if srv.state.value == srvStateClosed {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return srv.broker.Ping()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user