mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			808 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			808 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 Kentaro Hibino. All rights reserved.
 | |
| // Use of this source code is governed by a MIT license
 | |
| // that can be found in the LICENSE file.
 | |
| 
 | |
| package asynq
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/google/uuid"
 | |
| 	"github.com/hibiken/asynq/internal/rdb"
 | |
| )
 | |
| 
 | |
| // Inspector is a client interface to inspect and mutate the state of
 | |
| // queues and tasks.
 | |
| type Inspector struct {
 | |
| 	rdb *rdb.RDB
 | |
| }
 | |
| 
 | |
| // New returns a new instance of Inspector.
 | |
| func NewInspector(r RedisConnOpt) *Inspector {
 | |
| 	return &Inspector{
 | |
| 		rdb: rdb.NewRDB(createRedisClient(r)),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Close closes the connection with redis.
 | |
| func (i *Inspector) Close() error {
 | |
| 	return i.rdb.Close()
 | |
| }
 | |
| 
 | |
| // Queues returns a list of all queue names.
 | |
| func (i *Inspector) Queues() ([]string, error) {
 | |
| 	return i.rdb.AllQueues()
 | |
| }
 | |
| 
 | |
| // QueueStats represents a state of queues at a certain time.
 | |
| type QueueStats struct {
 | |
| 	// Name of the queue.
 | |
| 	Queue string
 | |
| 	// Size is the total number of tasks in the queue.
 | |
| 	// The value is the sum of Pending, Active, Scheduled, Retry, and Dead.
 | |
| 	Size int
 | |
| 	// Number of pending tasks.
 | |
| 	Pending int
 | |
| 	// Number of active tasks.
 | |
| 	Active int
 | |
| 	// Number of scheduled tasks.
 | |
| 	Scheduled int
 | |
| 	// Number of retry tasks.
 | |
| 	Retry int
 | |
| 	// Number of dead tasks.
 | |
| 	Dead int
 | |
| 	// Total number of tasks being processed during the given date.
 | |
| 	// The number includes both succeeded and failed tasks.
 | |
| 	Processed int
 | |
| 	// Total number of tasks failed to be processed during the given date.
 | |
| 	Failed int
 | |
| 	// Paused indicates whether the queue is paused.
 | |
| 	// If true, tasks in the queue will not be processed.
 | |
| 	Paused bool
 | |
| 	// Time when this stats was taken.
 | |
| 	Timestamp time.Time
 | |
| }
 | |
| 
 | |
| // CurrentStats returns a current stats of the given queue.
 | |
| func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	stats, err := i.rdb.CurrentStats(qname)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &QueueStats{
 | |
| 		Queue:     stats.Queue,
 | |
| 		Size:      stats.Size,
 | |
| 		Pending:   stats.Pending,
 | |
| 		Active:    stats.Active,
 | |
| 		Scheduled: stats.Scheduled,
 | |
| 		Retry:     stats.Retry,
 | |
| 		Dead:      stats.Dead,
 | |
| 		Processed: stats.Processed,
 | |
| 		Failed:    stats.Failed,
 | |
| 		Paused:    stats.Paused,
 | |
| 		Timestamp: stats.Timestamp,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| // DailyStats holds aggregate data for a given day for a given queue.
 | |
| type DailyStats struct {
 | |
| 	// Name of the queue.
 | |
| 	Queue string
 | |
| 	// Total number of tasks being processed during the given date.
 | |
| 	// The number includes both succeeded and failed tasks.
 | |
| 	Processed int
 | |
| 	// Total number of tasks failed to be processed during the given date.
 | |
| 	Failed int
 | |
| 	// Date this stats was taken.
 | |
| 	Date time.Time
 | |
| }
 | |
| 
 | |
| // History returns a list of stats from the last n days.
 | |
| func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	stats, err := i.rdb.HistoricalStats(qname, n)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var res []*DailyStats
 | |
| 	for _, s := range stats {
 | |
| 		res = append(res, &DailyStats{
 | |
| 			Queue:     s.Queue,
 | |
| 			Processed: s.Processed,
 | |
| 			Failed:    s.Failed,
 | |
| 			Date:      s.Time,
 | |
| 		})
 | |
| 	}
 | |
| 	return res, nil
 | |
| }
 | |
| 
 | |
| // ErrQueueNotFound indicates that the specified queue does not exist.
 | |
| type ErrQueueNotFound struct {
 | |
| 	qname string
 | |
| }
 | |
| 
 | |
| func (e *ErrQueueNotFound) Error() string {
 | |
| 	return fmt.Sprintf("queue %q does not exist", e.qname)
 | |
| }
 | |
| 
 | |
| // ErrQueueNotEmpty indicates that the specified queue is not empty.
 | |
| type ErrQueueNotEmpty struct {
 | |
| 	qname string
 | |
| }
 | |
| 
 | |
| func (e *ErrQueueNotEmpty) Error() string {
 | |
| 	return fmt.Sprintf("queue %q is not empty", e.qname)
 | |
| }
 | |
| 
 | |
| // DeleteQueue removes the specified queue.
 | |
| //
 | |
| // If force is set to true, DeleteQueue will remove the queue regardless of
 | |
| // the queue size as long as no tasks are active in the queue.
 | |
| // If force is set to false, DeleteQueue will remove the queue only if
 | |
| // the queue is empty.
 | |
| //
 | |
| // If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
 | |
| // If force is set to false and the specified queue is not empty, DeleteQueue
 | |
| // returns ErrQueueNotEmpty.
 | |
| func (i *Inspector) DeleteQueue(qname string, force bool) error {
 | |
| 	err := i.rdb.RemoveQueue(qname, force)
 | |
| 	if _, ok := err.(*rdb.ErrQueueNotFound); ok {
 | |
| 		return &ErrQueueNotFound{qname}
 | |
| 	}
 | |
| 	if _, ok := err.(*rdb.ErrQueueNotEmpty); ok {
 | |
| 		return &ErrQueueNotEmpty{qname}
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // PendingTask is a task in a queue and is ready to be processed.
 | |
| type PendingTask struct {
 | |
| 	*Task
 | |
| 	ID    string
 | |
| 	Queue string
 | |
| }
 | |
| 
 | |
| // ActiveTask is a task that's currently being processed.
 | |
| type ActiveTask struct {
 | |
| 	*Task
 | |
| 	ID    string
 | |
| 	Queue string
 | |
| }
 | |
| 
 | |
| // ScheduledTask is a task scheduled to be processed in the future.
 | |
| type ScheduledTask struct {
 | |
| 	*Task
 | |
| 	ID            string
 | |
| 	Queue         string
 | |
| 	NextProcessAt time.Time
 | |
| 
 | |
| 	score int64
 | |
| }
 | |
| 
 | |
| // RetryTask is a task scheduled to be retried in the future.
 | |
| type RetryTask struct {
 | |
| 	*Task
 | |
| 	ID            string
 | |
| 	Queue         string
 | |
| 	NextProcessAt time.Time
 | |
| 	MaxRetry      int
 | |
| 	Retried       int
 | |
| 	ErrorMsg      string
 | |
| 	// TODO: LastFailedAt  time.Time
 | |
| 
 | |
| 	score int64
 | |
| }
 | |
| 
 | |
| // DeadTask is a task exhausted its retries.
 | |
| // DeadTask won't be retried automatically.
 | |
| type DeadTask struct {
 | |
| 	*Task
 | |
| 	ID           string
 | |
| 	Queue        string
 | |
| 	MaxRetry     int
 | |
| 	Retried      int
 | |
| 	LastFailedAt time.Time
 | |
| 	ErrorMsg     string
 | |
| 
 | |
| 	score int64
 | |
| }
 | |
| 
 | |
| // Key returns a key used to delete, run, and kill the task.
 | |
| func (t *ScheduledTask) Key() string {
 | |
| 	return fmt.Sprintf("s:%v:%v", t.ID, t.score)
 | |
| }
 | |
| 
 | |
| // Key returns a key used to delete, run, and kill the task.
 | |
| func (t *RetryTask) Key() string {
 | |
| 	return fmt.Sprintf("r:%v:%v", t.ID, t.score)
 | |
| }
 | |
| 
 | |
| // Key returns a key used to delete, run, and kill the task.
 | |
| func (t *DeadTask) Key() string {
 | |
| 	return fmt.Sprintf("d:%v:%v", t.ID, t.score)
 | |
| }
 | |
| 
 | |
| // parseTaskKey parses a key string and returns each part of key with proper
 | |
| // type if valid, otherwise it reports an error.
 | |
| func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err error) {
 | |
| 	parts := strings.Split(key, ":")
 | |
| 	if len(parts) != 3 {
 | |
| 		return uuid.Nil, 0, "", fmt.Errorf("invalid id")
 | |
| 	}
 | |
| 	id, err = uuid.Parse(parts[1])
 | |
| 	if err != nil {
 | |
| 		return uuid.Nil, 0, "", fmt.Errorf("invalid id")
 | |
| 	}
 | |
| 	score, err = strconv.ParseInt(parts[2], 10, 64)
 | |
| 	if err != nil {
 | |
| 		return uuid.Nil, 0, "", fmt.Errorf("invalid id")
 | |
| 	}
 | |
| 	state = parts[0]
 | |
| 	if len(state) != 1 || !strings.Contains("srd", state) {
 | |
| 		return uuid.Nil, 0, "", fmt.Errorf("invalid id")
 | |
| 	}
 | |
| 	return id, score, state, nil
 | |
| }
 | |
| 
 | |
| // ListOption specifies behavior of list operation.
 | |
| type ListOption interface{}
 | |
| 
 | |
| // Internal list option representations.
 | |
| type (
 | |
| 	pageSizeOpt int
 | |
| 	pageNumOpt  int
 | |
| )
 | |
| 
 | |
| type listOption struct {
 | |
| 	pageSize int
 | |
| 	pageNum  int
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	// Page size used by default in list operation.
 | |
| 	defaultPageSize = 30
 | |
| 
 | |
| 	// Page number used by default in list operation.
 | |
| 	defaultPageNum = 1
 | |
| )
 | |
| 
 | |
| func composeListOptions(opts ...ListOption) listOption {
 | |
| 	res := listOption{
 | |
| 		pageSize: defaultPageSize,
 | |
| 		pageNum:  defaultPageNum,
 | |
| 	}
 | |
| 	for _, opt := range opts {
 | |
| 		switch opt := opt.(type) {
 | |
| 		case pageSizeOpt:
 | |
| 			res.pageSize = int(opt)
 | |
| 		case pageNumOpt:
 | |
| 			res.pageNum = int(opt)
 | |
| 		default:
 | |
| 			// ignore unexpected option
 | |
| 		}
 | |
| 	}
 | |
| 	return res
 | |
| }
 | |
| 
 | |
| // PageSize returns an option to specify the page size for list operation.
 | |
| //
 | |
| // Negative page size is treated as zero.
 | |
| func PageSize(n int) ListOption {
 | |
| 	if n < 0 {
 | |
| 		n = 0
 | |
| 	}
 | |
| 	return pageSizeOpt(n)
 | |
| }
 | |
| 
 | |
| // Page returns an option to specify the page number for list operation.
 | |
| // The value 1 fetches the first page.
 | |
| //
 | |
| // Negative page number is treated as one.
 | |
| func Page(n int) ListOption {
 | |
| 	if n < 0 {
 | |
| 		n = 1
 | |
| 	}
 | |
| 	return pageNumOpt(n)
 | |
| }
 | |
| 
 | |
| // ListPendingTasks retrieves pending tasks from the specified queue.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	msgs, err := i.rdb.ListPending(qname, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var tasks []*PendingTask
 | |
| 	for _, m := range msgs {
 | |
| 		tasks = append(tasks, &PendingTask{
 | |
| 			Task:  NewTask(m.Type, m.Payload),
 | |
| 			ID:    m.ID.String(),
 | |
| 			Queue: m.Queue,
 | |
| 		})
 | |
| 	}
 | |
| 	return tasks, err
 | |
| }
 | |
| 
 | |
| // ListActiveTasks retrieves active tasks from the specified queue.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	msgs, err := i.rdb.ListActive(qname, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var tasks []*ActiveTask
 | |
| 	for _, m := range msgs {
 | |
| 		tasks = append(tasks, &ActiveTask{
 | |
| 			Task:  NewTask(m.Type, m.Payload),
 | |
| 			ID:    m.ID.String(),
 | |
| 			Queue: m.Queue,
 | |
| 		})
 | |
| 	}
 | |
| 	return tasks, err
 | |
| }
 | |
| 
 | |
| // ListScheduledTasks retrieves scheduled tasks from the specified queue.
 | |
| // Tasks are sorted by NextProcessAt field in ascending order.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	zs, err := i.rdb.ListScheduled(qname, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var tasks []*ScheduledTask
 | |
| 	for _, z := range zs {
 | |
| 		processAt := time.Unix(z.Score, 0)
 | |
| 		t := NewTask(z.Message.Type, z.Message.Payload)
 | |
| 		tasks = append(tasks, &ScheduledTask{
 | |
| 			Task:          t,
 | |
| 			ID:            z.Message.ID.String(),
 | |
| 			Queue:         z.Message.Queue,
 | |
| 			NextProcessAt: processAt,
 | |
| 			score:         z.Score,
 | |
| 		})
 | |
| 	}
 | |
| 	return tasks, nil
 | |
| }
 | |
| 
 | |
| // ListRetryTasks retrieves retry tasks from the specified queue.
 | |
| // Tasks are sorted by NextProcessAt field in ascending order.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	zs, err := i.rdb.ListRetry(qname, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var tasks []*RetryTask
 | |
| 	for _, z := range zs {
 | |
| 		processAt := time.Unix(z.Score, 0)
 | |
| 		t := NewTask(z.Message.Type, z.Message.Payload)
 | |
| 		tasks = append(tasks, &RetryTask{
 | |
| 			Task:          t,
 | |
| 			ID:            z.Message.ID.String(),
 | |
| 			Queue:         z.Message.Queue,
 | |
| 			NextProcessAt: processAt,
 | |
| 			MaxRetry:      z.Message.Retry,
 | |
| 			Retried:       z.Message.Retried,
 | |
| 			// TODO: LastFailedAt: z.Message.LastFailedAt
 | |
| 			ErrorMsg: z.Message.ErrorMsg,
 | |
| 			score:    z.Score,
 | |
| 		})
 | |
| 	}
 | |
| 	return tasks, nil
 | |
| }
 | |
| 
 | |
| // ListDeadTasks retrieves dead tasks from the specified queue.
 | |
| // Tasks are sorted by LastFailedAt field in descending order.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	zs, err := i.rdb.ListDead(qname, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var tasks []*DeadTask
 | |
| 	for _, z := range zs {
 | |
| 		failedAt := time.Unix(z.Score, 0)
 | |
| 		t := NewTask(z.Message.Type, z.Message.Payload)
 | |
| 		tasks = append(tasks, &DeadTask{
 | |
| 			Task:         t,
 | |
| 			ID:           z.Message.ID.String(),
 | |
| 			Queue:        z.Message.Queue,
 | |
| 			MaxRetry:     z.Message.Retry,
 | |
| 			Retried:      z.Message.Retried,
 | |
| 			LastFailedAt: failedAt,
 | |
| 			ErrorMsg:     z.Message.ErrorMsg,
 | |
| 			score:        z.Score,
 | |
| 		})
 | |
| 	}
 | |
| 	return tasks, nil
 | |
| }
 | |
| 
 | |
| // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
 | |
| // and reports the number tasks deleted.
 | |
| func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.DeleteAllScheduledTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // DeleteAllRetryTasks deletes all retry tasks from the specified queue,
 | |
| // and reports the number tasks deleted.
 | |
| func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.DeleteAllRetryTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // DeleteAllDeadTasks deletes all dead tasks from the specified queue,
 | |
| // and reports the number tasks deleted.
 | |
| func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.DeleteAllDeadTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // DeleteTaskByKey deletes a task with the given key from the given queue.
 | |
| func (i *Inspector) DeleteTaskByKey(qname, key string) error {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	id, score, state, err := parseTaskKey(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	switch state {
 | |
| 	case "s":
 | |
| 		return i.rdb.DeleteScheduledTask(qname, id, score)
 | |
| 	case "r":
 | |
| 		return i.rdb.DeleteRetryTask(qname, id, score)
 | |
| 	case "d":
 | |
| 		return i.rdb.DeleteDeadTask(qname, id, score)
 | |
| 	default:
 | |
| 		return fmt.Errorf("invalid key")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue,
 | |
| // and reports the number of tasks transitioned.
 | |
| func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.RunAllScheduledTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // RunAllRetryTasks transition all retry tasks to pending state within the given queue,
 | |
| // and reports the number of tasks transitioned.
 | |
| func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.RunAllRetryTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // RunAllDeadTasks transition all dead tasks to pending state within the given queue,
 | |
| // and reports the number of tasks transitioned.
 | |
| func (i *Inspector) RunAllDeadTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.RunAllDeadTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // RunTaskByKey transition a task to pending state given task key and queue name.
 | |
| func (i *Inspector) RunTaskByKey(qname, key string) error {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	id, score, state, err := parseTaskKey(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	switch state {
 | |
| 	case "s":
 | |
| 		return i.rdb.RunScheduledTask(qname, id, score)
 | |
| 	case "r":
 | |
| 		return i.rdb.RunRetryTask(qname, id, score)
 | |
| 	case "d":
 | |
| 		return i.rdb.RunDeadTask(qname, id, score)
 | |
| 	default:
 | |
| 		return fmt.Errorf("invalid key")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // KillAllScheduledTasks kills all scheduled tasks within the given queue,
 | |
| // and reports the number of tasks killed.
 | |
| func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.KillAllScheduledTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // KillAllRetryTasks kills all retry tasks within the given queue,
 | |
| // and reports the number of tasks killed.
 | |
| func (i *Inspector) KillAllRetryTasks(qname string) (int, error) {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	n, err := i.rdb.KillAllRetryTasks(qname)
 | |
| 	return int(n), err
 | |
| }
 | |
| 
 | |
| // KillTaskByKey kills a task with the given key in the given queue.
 | |
| func (i *Inspector) KillTaskByKey(qname, key string) error {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	id, score, state, err := parseTaskKey(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	switch state {
 | |
| 	case "s":
 | |
| 		return i.rdb.KillScheduledTask(qname, id, score)
 | |
| 	case "r":
 | |
| 		return i.rdb.KillRetryTask(qname, id, score)
 | |
| 	case "d":
 | |
| 		return fmt.Errorf("task already dead")
 | |
| 	default:
 | |
| 		return fmt.Errorf("invalid key")
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // CancelActiveTask sends a signal to cancel processing of the task with
 | |
| // the given id. CancelActiveTask is best-effort, which means that it does not
 | |
| // guarantee that the task with the given id will be canceled. The return
 | |
| // value only indicates whether the cancelation signal has been sent.
 | |
| func (i *Inspector) CancelActiveTask(id string) error {
 | |
| 	return i.rdb.PublishCancelation(id)
 | |
| }
 | |
| 
 | |
| // PauseQueue pauses task processing on the specified queue.
 | |
| // If the queue is already paused, it will return a non-nil error.
 | |
| func (i *Inspector) PauseQueue(qname string) error {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return i.rdb.Pause(qname)
 | |
| }
 | |
| 
 | |
| // UnpauseQueue resumes task processing on the specified queue.
 | |
| // If the queue is not paused, it will return a non-nil error.
 | |
| func (i *Inspector) UnpauseQueue(qname string) error {
 | |
| 	if err := validateQueueName(qname); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return i.rdb.Unpause(qname)
 | |
| }
 | |
| 
 | |
| // Servers return a list of running servers' information.
 | |
| func (i *Inspector) Servers() ([]*ServerInfo, error) {
 | |
| 	servers, err := i.rdb.ListServers()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	workers, err := i.rdb.ListWorkers()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	m := make(map[string]*ServerInfo) // ServerInfo keyed by serverID
 | |
| 	for _, s := range servers {
 | |
| 		m[s.ServerID] = &ServerInfo{
 | |
| 			ID:             s.ServerID,
 | |
| 			Host:           s.Host,
 | |
| 			PID:            s.PID,
 | |
| 			Concurrency:    s.Concurrency,
 | |
| 			Queues:         s.Queues,
 | |
| 			StrictPriority: s.StrictPriority,
 | |
| 			Started:        s.Started,
 | |
| 			Status:         s.Status,
 | |
| 			ActiveWorkers:  make([]*WorkerInfo, 0),
 | |
| 		}
 | |
| 	}
 | |
| 	for _, w := range workers {
 | |
| 		srvInfo, ok := m[w.ServerID]
 | |
| 		if !ok {
 | |
| 			continue
 | |
| 		}
 | |
| 		wrkInfo := &WorkerInfo{
 | |
| 			Started: w.Started,
 | |
| 			Task: &ActiveTask{
 | |
| 				Task:  NewTask(w.Type, w.Payload),
 | |
| 				ID:    w.ID,
 | |
| 				Queue: w.Queue,
 | |
| 			},
 | |
| 		}
 | |
| 		srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo)
 | |
| 	}
 | |
| 	var out []*ServerInfo
 | |
| 	for _, srvInfo := range m {
 | |
| 		out = append(out, srvInfo)
 | |
| 	}
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| // ServerInfo describes a running Server instance.
 | |
| type ServerInfo struct {
 | |
| 	// Unique Identifier for the server.
 | |
| 	ID string
 | |
| 	// Host machine on which the server is running.
 | |
| 	Host string
 | |
| 	// PID of the process in which the server is running.
 | |
| 	PID int
 | |
| 
 | |
| 	// Server configuration details.
 | |
| 	// See Config doc for field descriptions.
 | |
| 	Concurrency    int
 | |
| 	Queues         map[string]int
 | |
| 	StrictPriority bool
 | |
| 
 | |
| 	// Time the server started.
 | |
| 	Started time.Time
 | |
| 	// Status indicates the status of the server.
 | |
| 	// TODO: Update comment with more details.
 | |
| 	Status string
 | |
| 	// A List of active workers currently processing tasks.
 | |
| 	ActiveWorkers []*WorkerInfo
 | |
| }
 | |
| 
 | |
| // WorkerInfo describes a running worker processing a task.
 | |
| type WorkerInfo struct {
 | |
| 	// The task the worker is processing.
 | |
| 	Task *ActiveTask
 | |
| 	// Time the worker started processing the task.
 | |
| 	Started time.Time
 | |
| }
 | |
| 
 | |
| // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
 | |
| func (i *Inspector) ClusterKeySlot(qname string) (int64, error) {
 | |
| 	return i.rdb.ClusterKeySlot(qname)
 | |
| }
 | |
| 
 | |
| // ClusterNode describes a node in redis cluster.
 | |
| type ClusterNode struct {
 | |
| 	// Node ID in the cluster.
 | |
| 	ID string
 | |
| 
 | |
| 	// Address of the node.
 | |
| 	Addr string
 | |
| }
 | |
| 
 | |
| // ClusterNode returns a list of nodes the given queue belongs to.
 | |
| func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) {
 | |
| 	nodes, err := i.rdb.ClusterNodes(qname)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var res []ClusterNode
 | |
| 	for _, node := range nodes {
 | |
| 		res = append(res, ClusterNode{ID: node.ID, Addr: node.Addr})
 | |
| 	}
 | |
| 	return res, nil
 | |
| }
 | |
| 
 | |
| // SchedulerEntry holds information about a periodic task registered with a scheduler.
 | |
| type SchedulerEntry struct {
 | |
| 	// Identifier of this entry.
 | |
| 	ID string
 | |
| 
 | |
| 	// Spec describes the schedule of this entry.
 | |
| 	Spec string
 | |
| 
 | |
| 	// Periodic Task registered for this entry.
 | |
| 	Task *Task
 | |
| 
 | |
| 	// Opts is the options for the periodic task.
 | |
| 	Opts []Option
 | |
| 
 | |
| 	// Next shows the next time the task will be enqueued.
 | |
| 	Next time.Time
 | |
| 
 | |
| 	// Prev shows the last time the task was enqueued.
 | |
| 	// Zero time if task was never enqueued.
 | |
| 	Prev time.Time
 | |
| }
 | |
| 
 | |
| // SchedulerEntries returns a list of all entries registered with
 | |
| // currently running schedulers.
 | |
| func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
 | |
| 	var entries []*SchedulerEntry
 | |
| 	res, err := i.rdb.ListSchedulerEntries()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	for _, e := range res {
 | |
| 		task := NewTask(e.Type, e.Payload)
 | |
| 		var opts []Option
 | |
| 		for _, s := range e.Opts {
 | |
| 			if o, err := parseOption(s); err == nil {
 | |
| 				// ignore bad data
 | |
| 				opts = append(opts, o)
 | |
| 			}
 | |
| 		}
 | |
| 		entries = append(entries, &SchedulerEntry{
 | |
| 			ID:   e.ID,
 | |
| 			Spec: e.Spec,
 | |
| 			Task: task,
 | |
| 			Opts: opts,
 | |
| 			Next: e.Next,
 | |
| 			Prev: e.Prev,
 | |
| 		})
 | |
| 	}
 | |
| 	return entries, nil
 | |
| }
 | |
| 
 | |
| // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
 | |
| type SchedulerEnqueueEvent struct {
 | |
| 	// ID of the task that was enqueued.
 | |
| 	TaskID string
 | |
| 
 | |
| 	// Time the task was enqueued.
 | |
| 	EnqueuedAt time.Time
 | |
| }
 | |
| 
 | |
| // ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
 | |
| //
 | |
| // By default, it retrieves the first 30 tasks.
 | |
| func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) {
 | |
| 	opt := composeListOptions(opts...)
 | |
| 	pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
 | |
| 	data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	var events []*SchedulerEnqueueEvent
 | |
| 	for _, e := range data {
 | |
| 		events = append(events, &SchedulerEnqueueEvent{TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt})
 | |
| 	}
 | |
| 	return events, nil
 | |
| }
 |