2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Validate queue name in Inspector

This commit is contained in:
Ken Hibino 2020-08-31 05:53:17 -07:00
parent 131ac823fd
commit dab8295883
2 changed files with 69 additions and 2 deletions

View File

@ -132,8 +132,8 @@ func composeOptions(opts ...Option) (option, error) {
res.retry = int(opt)
case queueOption:
trimmed := strings.TrimSpace(string(opt))
if len(trimmed) == 0 {
return option{}, fmt.Errorf("queue name must contain one or more characters")
if err := validateQueueName(trimmed); err != nil {
return option{}, err
}
res.queue = trimmed
case timeoutOption:
@ -149,6 +149,13 @@ func composeOptions(opts ...Option) (option, error) {
return res, nil
}
func validateQueueName(qname string) error {
if len(qname) == 0 {
return fmt.Errorf("queue name must contain one or more characters")
}
return nil
}
const (
// Default max retry count used if nothing is specified.
defaultMaxRetry = 25

View File

@ -63,6 +63,9 @@ type QueueStats struct {
// CurrentStats returns a current stats of the given queue.
func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
stats, err := i.rdb.CurrentStats(qname)
if err != nil {
return nil, err
@ -97,6 +100,9 @@ type DailyStats struct {
// History returns a list of stats from the last n days.
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
stats, err := i.rdb.HistoricalStats(qname, n)
if err != nil {
return nil, err
@ -267,6 +273,9 @@ func Page(n int) ListOption {
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListEnqueued(qname, pgn)
@ -288,6 +297,9 @@ func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*Enqu
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(qname, pgn)
@ -310,6 +322,9 @@ func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*In
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(qname, pgn)
@ -336,6 +351,9 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(qname, pgn)
@ -366,6 +384,9 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListDead(qname, pgn)
@ -394,6 +415,9 @@ func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllScheduledTasks(qname)
return int(n), err
}
@ -401,6 +425,9 @@ func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
// DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllRetryTasks(qname)
return int(n), err
}
@ -408,12 +435,18 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
// DeleteAllDeadTasks deletes all dead tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllDeadTasks(qname)
return int(n), err
}
// DeleteTaskByKey deletes a task with the given key from the given queue.
func (i *Inspector) DeleteTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
if err != nil {
return err
@ -434,6 +467,9 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
// EnqueueAllScheduledTasks enqueues all scheduled tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.EnqueueAllScheduledTasks(qname)
return int(n), err
}
@ -441,6 +477,9 @@ func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) {
// EnqueueAllRetryTasks enqueues all retry tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.EnqueueAllRetryTasks(qname)
return int(n), err
}
@ -448,12 +487,18 @@ func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) {
// EnqueueAllDeadTasks enqueues all dead tasks for immediate processing within the given queue,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllDeadTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.EnqueueAllDeadTasks(qname)
return int(n), err
}
// EnqueueTaskByKey enqueues a task with the given key in the given queue.
func (i *Inspector) EnqueueTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
if err != nil {
return err
@ -473,6 +518,9 @@ func (i *Inspector) EnqueueTaskByKey(qname, key string) error {
// KillAllScheduledTasks kills all scheduled tasks within the given queue,
// and reports the number of tasks killed.
func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.KillAllScheduledTasks(qname)
return int(n), err
}
@ -480,12 +528,18 @@ func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) {
// KillAllRetryTasks kills all retry tasks within the given queue,
// and reports the number of tasks killed.
func (i *Inspector) KillAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil {
return 0, err
}
n, err := i.rdb.KillAllRetryTasks(qname)
return int(n), err
}
// KillTaskByKey kills a task with the given key in the given queue.
func (i *Inspector) KillTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil {
return err
}
id, score, state, err := parseTaskKey(key)
if err != nil {
return err
@ -505,11 +559,17 @@ func (i *Inspector) KillTaskByKey(qname, key string) error {
// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil {
return err
}
return i.rdb.Pause(qname)
}
// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil {
return err
}
return i.rdb.Unpause(qname)
}