diff --git a/client.go b/client.go index 91ee83f..1c1d63b 100644 --- a/client.go +++ b/client.go @@ -132,8 +132,8 @@ func composeOptions(opts ...Option) (option, error) { res.retry = int(opt) case queueOption: trimmed := strings.TrimSpace(string(opt)) - if len(trimmed) == 0 { - return option{}, fmt.Errorf("queue name must contain one or more characters") + if err := validateQueueName(trimmed); err != nil { + return option{}, err } res.queue = trimmed case timeoutOption: @@ -149,6 +149,13 @@ func composeOptions(opts ...Option) (option, error) { return res, nil } +func validateQueueName(qname string) error { + if len(qname) == 0 { + return fmt.Errorf("queue name must contain one or more characters") + } + return nil +} + const ( // Default max retry count used if nothing is specified. defaultMaxRetry = 25 diff --git a/inspector.go b/inspector.go index 76aabc5..2d5c7d6 100644 --- a/inspector.go +++ b/inspector.go @@ -63,6 +63,9 @@ type QueueStats struct { // CurrentStats returns a current stats of the given queue. func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } stats, err := i.rdb.CurrentStats(qname) if err != nil { return nil, err @@ -97,6 +100,9 @@ type DailyStats struct { // History returns a list of stats from the last n days. func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } stats, err := i.rdb.HistoricalStats(qname, n) if err != nil { return nil, err @@ -267,6 +273,9 @@ func Page(n int) ListOption { // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} msgs, err := i.rdb.ListEnqueued(qname, pgn) @@ -288,6 +297,9 @@ func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*Enqu // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} msgs, err := i.rdb.ListInProgress(qname, pgn) @@ -310,6 +322,9 @@ func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*In // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListScheduled(qname, pgn) @@ -336,6 +351,9 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListRetry(qname, pgn) @@ -366,6 +384,9 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) { + if err := validateQueueName(qname); err != nil { + return nil, err + } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} zs, err := i.rdb.ListDead(qname, pgn) @@ -394,6 +415,9 @@ func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.DeleteAllScheduledTasks(qname) return int(n), err } @@ -401,6 +425,9 @@ func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { // DeleteAllRetryTasks deletes all retry tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.DeleteAllRetryTasks(qname) return int(n), err } @@ -408,12 +435,18 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { // DeleteAllDeadTasks deletes all dead tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.DeleteAllDeadTasks(qname) return int(n), err } // DeleteTaskByKey deletes a task with the given key from the given queue. func (i *Inspector) DeleteTaskByKey(qname, key string) error { + if err := validateQueueName(qname); err != nil { + return err + } id, score, state, err := parseTaskKey(key) if err != nil { return err @@ -434,6 +467,9 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { // EnqueueAllScheduledTasks enqueues all scheduled tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.EnqueueAllScheduledTasks(qname) return int(n), err } @@ -441,6 +477,9 @@ func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) { // EnqueueAllRetryTasks enqueues all retry tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.EnqueueAllRetryTasks(qname) return int(n), err } @@ -448,12 +487,18 @@ func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) { // EnqueueAllDeadTasks enqueues all dead tasks for immediate processing within the given queue, // and reports the number of tasks enqueued. func (i *Inspector) EnqueueAllDeadTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.EnqueueAllDeadTasks(qname) return int(n), err } // EnqueueTaskByKey enqueues a task with the given key in the given queue. func (i *Inspector) EnqueueTaskByKey(qname, key string) error { + if err := validateQueueName(qname); err != nil { + return err + } id, score, state, err := parseTaskKey(key) if err != nil { return err @@ -473,6 +518,9 @@ func (i *Inspector) EnqueueTaskByKey(qname, key string) error { // KillAllScheduledTasks kills all scheduled tasks within the given queue, // and reports the number of tasks killed. func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.KillAllScheduledTasks(qname) return int(n), err } @@ -480,12 +528,18 @@ func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) { // KillAllRetryTasks kills all retry tasks within the given queue, // and reports the number of tasks killed. func (i *Inspector) KillAllRetryTasks(qname string) (int, error) { + if err := validateQueueName(qname); err != nil { + return 0, err + } n, err := i.rdb.KillAllRetryTasks(qname) return int(n), err } // KillTaskByKey kills a task with the given key in the given queue. func (i *Inspector) KillTaskByKey(qname, key string) error { + if err := validateQueueName(qname); err != nil { + return err + } id, score, state, err := parseTaskKey(key) if err != nil { return err @@ -505,11 +559,17 @@ func (i *Inspector) KillTaskByKey(qname, key string) error { // PauseQueue pauses task processing on the specified queue. // If the queue is already paused, it will return a non-nil error. func (i *Inspector) PauseQueue(qname string) error { + if err := validateQueueName(qname); err != nil { + return err + } return i.rdb.Pause(qname) } // UnpauseQueue resumes task processing on the specified queue. // If the queue is not paused, it will return a non-nil error. func (i *Inspector) UnpauseQueue(qname string) error { + if err := validateQueueName(qname); err != nil { + return err + } return i.rdb.Unpause(qname) }