2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Update Inspector API

This commit is contained in:
Ken Hibino 2020-08-16 14:51:56 -07:00
parent f59de9ac56
commit 9348a62691
2 changed files with 874 additions and 637 deletions

View File

@ -29,70 +29,62 @@ func NewInspector(r RedisConnOpt) *Inspector {
// Stats represents a state of queues at a certain time. // Stats represents a state of queues at a certain time.
type Stats struct { type Stats struct {
Enqueued int // Name of the queue.
Queue string
// Number of enqueued tasks.
Enqueued int
// Number of in-progress tasks.
InProgress int InProgress int
Scheduled int // Number of scheduled tasks.
Retry int Scheduled int
Dead int // Number of retry tasks.
Processed int Retry int
Failed int // Number of dead tasks.
Queues []*QueueInfo Dead int
Timestamp time.Time // Total number of tasks being processed during the given date.
} // The number includes both succeeded and failed tasks.
Processed int
// QueueInfo holds information about a queue. // Total number of tasks failed to be processed during the given date.
type QueueInfo struct { Failed int
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused. // Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed. // If true, tasks in the queue will not be processed.
Paused bool Paused bool
// Time when this stats was taken.
// Size is the number of tasks in the queue. Timestamp time.Time
Size int
} }
// CurrentStats returns a current stats of the queues. // CurrentStats returns a current stats of the given queue.
func (i *Inspector) CurrentStats() (*Stats, error) { func (i *Inspector) CurrentStats(qname string) (*Stats, error) {
stats, err := i.rdb.CurrentStats() stats, err := i.rdb.CurrentStats(qname)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var qs []*QueueInfo return &Stats(stats), nil
for _, q := range stats.Queues {
qs = append(qs, (*QueueInfo)(q))
}
return &Stats{
Enqueued: stats.Enqueued,
InProgress: stats.InProgress,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Queues: qs,
Timestamp: stats.Timestamp,
}, nil
} }
// DailyStats holds aggregate data for a given day. // DailyStats holds aggregate data for a given day for a given queue.
type DailyStats struct { type DailyStats struct {
// Name of the queue.
Queue string
// Total number of tasks being processed during the given date.
// The number includes both succeeded and failed tasks.
Processed int Processed int
Failed int // Total number of tasks failed to be processed during the given date.
Date time.Time Failed int
// Date this stats was taken.
Date time.Time
} }
// History returns a list of stats from the last n days. // History returns a list of stats from the last n days.
func (i *Inspector) History(n int) ([]*DailyStats, error) { func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
stats, err := i.rdb.HistoricalStats(n) stats, err := i.rdb.HistoricalStats(qname, n)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var res []*DailyStats var res []*DailyStats
for _, s := range stats { for _, s := range stats {
res = append(res, &DailyStats{ res = append(res, &DailyStats{
Queue: s.Queue,
Processed: s.Processed, Processed: s.Processed,
Failed: s.Failed, Failed: s.Failed,
Date: s.Time, Date: s.Time,
@ -111,7 +103,8 @@ type EnqueuedTask struct {
// InProgressTask is a task that's currently being processed. // InProgressTask is a task that's currently being processed.
type InProgressTask struct { type InProgressTask struct {
*Task *Task
ID string ID string
Queue string
} }
// ScheduledTask is a task scheduled to be processed in the future. // ScheduledTask is a task scheduled to be processed in the future.
@ -169,7 +162,7 @@ func (t *DeadTask) Key() string {
// parseTaskKey parses a key string and returns each part of key with proper // parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error. // type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) { func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) {
parts := strings.Split(key, ":") parts := strings.Split(key, ":")
if len(parts) != 3 { if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return uuid.Nil, 0, "", fmt.Errorf("invalid id")
@ -182,11 +175,11 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err erro
if err != nil { if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return uuid.Nil, 0, "", fmt.Errorf("invalid id")
} }
qtype = parts[0] state = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) { if len(state) != 1 || !strings.Contains("srd", state) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id") return uuid.Nil, 0, "", fmt.Errorf("invalid id")
} }
return id, score, qtype, nil return id, score, state, nil
} }
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
@ -250,7 +243,7 @@ func Page(n int) ListOption {
return pageNumOpt(n) return pageNumOpt(n)
} }
// ListScheduledTasks retrieves tasks in the specified queue. // ListEnqueuedTasks retrieves enqueued tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) { func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
@ -271,34 +264,35 @@ func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*Enqu
return tasks, err return tasks, err
} }
// ListScheduledTasks retrieves tasks currently being processed. // ListInProgressTasks retrieves in-progress tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) { func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) {
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(pgn) msgs, err := i.rdb.ListInProgress(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*InProgressTask var tasks []*InProgressTask
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &InProgressTask{ tasks = append(tasks, &InProgressTask{
Task: NewTask(m.Type, m.Payload), Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(), ID: m.ID.String(),
Queue: m.Queue,
}) })
} }
return tasks, err return tasks, err
} }
// ListScheduledTasks retrieves tasks in scheduled state. // ListScheduledTasks retrieves scheduled tasks from the specified queue.
// Tasks are sorted by NextEnqueueAt field in ascending order. // Tasks are sorted by NextEnqueueAt field in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) { func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(pgn) zs, err := i.rdb.ListScheduled(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -317,14 +311,14 @@ func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, er
return tasks, nil return tasks, nil
} }
// ListScheduledTasks retrieves tasks in retry state. // ListRetryTasks retrieves retry tasks from the specified queue.
// Tasks are sorted by NextEnqueueAt field in ascending order. // Tasks are sorted by NextEnqueueAt field in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) { func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(pgn) zs, err := i.rdb.ListRetry(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -347,14 +341,14 @@ func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) {
return tasks, nil return tasks, nil
} }
// ListScheduledTasks retrieves tasks in retry state. // ListDeadTasks retrieves dead tasks from the specified queue.
// Tasks are sorted by LastFailedAt field in descending order. // Tasks are sorted by LastFailedAt field in descending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) { func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) {
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListDead(pgn) zs, err := i.rdb.ListDead(qname, pgn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -377,109 +371,110 @@ func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) {
return nil, nil return nil, nil
} }
// DeleteAllScheduledTasks deletes all tasks in scheduled state, // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks() (int, error) { func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
n, err := i.rdb.DeleteAllScheduledTasks() n, err := i.rdb.DeleteAllScheduledTasks(qname)
return int(n), err return int(n), err
} }
// DeleteAllRetryTasks deletes all tasks in retry state, // DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks() (int, error) { func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
n, err := i.rdb.DeleteAllRetryTasks() n, err := i.rdb.DeleteAllRetryTasks(qname)
return int(n), err return int(n), err
} }
// DeleteAllDeadTasks deletes all tasks in dead state, // DeleteAllDeadTasks deletes all dead tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllDeadTasks() (int, error) { func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) {
n, err := i.rdb.DeleteAllDeadTasks() n, err := i.rdb.DeleteAllDeadTasks(qname)
return int(n), err return int(n), err
} }
// DeleteTaskByKey deletes a task with the given key. // DeleteTaskByKey deletes a task with the given key from the given queue.
func (i *Inspector) DeleteTaskByKey(key string) error { func (i *Inspector) DeleteTaskByKey(qname, key string) error {
id, score, qtype, err := parseTaskKey(key) id, score, state, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch qtype { switch state {
case "s": case "s":
return i.rdb.DeleteScheduledTask(id, score) return i.rdb.DeleteScheduledTask(qname, id, score)
case "r": case "r":
return i.rdb.DeleteRetryTask(id, score) return i.rdb.DeleteRetryTask(qname, id, score)
case "d": case "d":
return i.rdb.DeleteDeadTask(id, score) return i.rdb.DeleteDeadTask(qname, id, score)
default: default:
return fmt.Errorf("invalid key") return fmt.Errorf("invalid key")
} }
} }
// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state, // TODO(hibiken): Use different verb here. Idea: Run or Stage
// EnqueueAllScheduledTasks enqueues all scheduled tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued. // and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllScheduledTasks() (int, error) { func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) {
n, err := i.rdb.EnqueueAllScheduledTasks() n, err := i.rdb.EnqueueAllScheduledTasks(qname)
return int(n), err return int(n), err
} }
// EnqueueAllRetryTasks enqueues all tasks in the retry state, // EnqueueAllRetryTasks enqueues all retry tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued. // and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllRetryTasks() (int, error) { func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) {
n, err := i.rdb.EnqueueAllRetryTasks() n, err := i.rdb.EnqueueAllRetryTasks(qname)
return int(n), err return int(n), err
} }
// EnqueueAllDeadTasks enqueues all tasks in the dead state, // EnqueueAllDeadTasks enqueues all dead tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued. // and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllDeadTasks() (int, error) { func (i *Inspector) EnqueueAllDeadTasks(qname string) (int, error) {
n, err := i.rdb.EnqueueAllDeadTasks() n, err := i.rdb.EnqueueAllDeadTasks(qname)
return int(n), err return int(n), err
} }
// EnqueueTaskByKey enqueues a task with the given key. // EnqueueTaskByKey enqueues a task with the given key in the given queue.
func (i *Inspector) EnqueueTaskByKey(key string) error { func (i *Inspector) EnqueueTaskByKey(qname, key string) error {
id, score, qtype, err := parseTaskKey(key) id, score, state, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch qtype { switch state {
case "s": case "s":
return i.rdb.EnqueueScheduledTask(id, score) return i.rdb.EnqueueScheduledTask(qname, id, score)
case "r": case "r":
return i.rdb.EnqueueRetryTask(id, score) return i.rdb.EnqueueRetryTask(qname, id, score)
case "d": case "d":
return i.rdb.EnqueueDeadTask(id, score) return i.rdb.EnqueueDeadTask(qname, id, score)
default: default:
return fmt.Errorf("invalid key") return fmt.Errorf("invalid key")
} }
} }
// KillAllScheduledTasks kills all tasks in scheduled state, // KillAllScheduledTasks kills all scheduled tasks within the given queue,
// and reports the number of tasks killed. // and reports the number of tasks killed.
func (i *Inspector) KillAllScheduledTasks() (int, error) { func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) {
n, err := i.rdb.KillAllScheduledTasks() n, err := i.rdb.KillAllScheduledTasks(qname)
return int(n), err return int(n), err
} }
// KillAllRetryTasks kills all tasks in retry state, // KillAllRetryTasks kills all retry tasks within the given queue,
// and reports the number of tasks killed. // and reports the number of tasks killed.
func (i *Inspector) KillAllRetryTasks() (int, error) { func (i *Inspector) KillAllRetryTasks(qname string) (int, error) {
n, err := i.rdb.KillAllRetryTasks() n, err := i.rdb.KillAllRetryTasks(qname)
return int(n), err return int(n), err
} }
// KillTaskByKey kills a task with the given key. // KillTaskByKey kills a task with the given key in the given queue.
func (i *Inspector) KillTaskByKey(key string) error { func (i *Inspector) KillTaskByKey(qname, key string) error {
id, score, qtype, err := parseTaskKey(key) id, score, state, err := parseTaskKey(key)
if err != nil { if err != nil {
return err return err
} }
switch qtype { switch state {
case "s": case "s":
return i.rdb.KillScheduledTask(id, score) return i.rdb.KillScheduledTask(qname, id, score)
case "r": case "r":
return i.rdb.KillRetryTask(id, score) return i.rdb.KillRetryTask(qname, id, score)
case "d": case "d":
return fmt.Errorf("task already dead") return fmt.Errorf("task already dead")
default: default:

File diff suppressed because it is too large Load Diff