2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Rename variables for public API documentation

This commit is contained in:
Ken Hibino
2022-04-10 17:55:21 -07:00
parent 0877f41015
commit e603189e06
4 changed files with 129 additions and 131 deletions

View File

@@ -93,13 +93,13 @@ func (n retryOption) Type() OptionType { return MaxRetryOpt }
func (n retryOption) Value() interface{} { return int(n) } func (n retryOption) Value() interface{} { return int(n) }
// Queue returns an option to specify the queue to enqueue the task into. // Queue returns an option to specify the queue to enqueue the task into.
func Queue(qname string) Option { func Queue(name string) Option {
return queueOption(qname) return queueOption(name)
} }
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) } func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) }
func (qname queueOption) Type() OptionType { return QueueOpt } func (name queueOption) Type() OptionType { return QueueOpt }
func (qname queueOption) Value() interface{} { return string(qname) } func (name queueOption) Value() interface{} { return string(name) }
// TaskID returns an option to specify the task ID. // TaskID returns an option to specify the task ID.
func TaskID(id string) Option { func TaskID(id string) Option {
@@ -196,17 +196,15 @@ func (ttl retentionOption) String() string { return fmt.Sprintf("Retention(%
func (ttl retentionOption) Type() OptionType { return RetentionOpt } func (ttl retentionOption) Type() OptionType { return RetentionOpt }
func (ttl retentionOption) Value() interface{} { return time.Duration(ttl) } func (ttl retentionOption) Value() interface{} { return time.Duration(ttl) }
// Group returns an option to specify the group key used for the task. // Group returns an option to specify the group used for the task.
// Tasks in a given queue with the same group key will be aggregated into one task before passed to Handler. // Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.
// func Group(name string) Option {
// To customize the aggregation and grouping policy, specify the Group* fields in Config. return groupOption(name)
func Group(key string) Option {
return groupOption(key)
} }
func (key groupOption) String() string { return fmt.Sprintf("Group(%q)", string(key)) } func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", string(name)) }
func (key groupOption) Type() OptionType { return GroupOpt } func (name groupOption) Type() OptionType { return GroupOpt }
func (key groupOption) Value() interface{} { return string(key) } func (name groupOption) Value() interface{} { return string(name) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
// //
@@ -227,7 +225,7 @@ type option struct {
uniqueTTL time.Duration uniqueTTL time.Duration
processAt time.Time processAt time.Time
retention time.Duration retention time.Duration
groupKey string group string
} }
// composeOptions merges user provided options into the default options // composeOptions merges user provided options into the default options
@@ -280,7 +278,7 @@ func composeOptions(opts ...Option) (option, error) {
if isBlank(key) { if isBlank(key) {
return option{}, errors.New("group key cannot be empty") return option{}, errors.New("group key cannot be empty")
} }
res.groupKey = key res.group = key
default: default:
// ignore unexpected option // ignore unexpected option
} }
@@ -378,7 +376,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
Deadline: deadline.Unix(), Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()), Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey, UniqueKey: uniqueKey,
GroupKey: opt.groupKey, GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()), Retention: int64(opt.retention.Seconds()),
} }
now := time.Now() now := time.Now()
@@ -386,10 +384,10 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
if opt.processAt.After(now) { if opt.processAt.After(now) {
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL) err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled state = base.TaskStateScheduled
} else if opt.groupKey != "" { } else if opt.group != "" {
// Use zero value for processAt since we don't know when the task will be aggregated and processed. // Use zero value for processAt since we don't know when the task will be aggregated and processed.
opt.processAt = time.Time{} opt.processAt = time.Time{}
err = c.addToGroup(ctx, msg, opt.groupKey, opt.uniqueTTL) err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
state = base.TaskStateAggregating state = base.TaskStateAggregating
} else { } else {
opt.processAt = now opt.processAt = now
@@ -422,9 +420,9 @@ func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Tim
return c.broker.Schedule(ctx, msg, t) return c.broker.Schedule(ctx, msg, t)
} }
func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error { func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, group string, uniqueTTL time.Duration) error {
if uniqueTTL > 0 { if uniqueTTL > 0 {
return c.broker.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL) return c.broker.AddToGroupUnique(ctx, msg, group, uniqueTTL)
} }
return c.broker.AddToGroup(ctx, msg, groupKey) return c.broker.AddToGroup(ctx, msg, group)
} }

View File

@@ -36,7 +36,7 @@ func GetMaxRetry(ctx context.Context) (n int, ok bool) {
// GetQueueName extracts queue name from a context, if any. // GetQueueName extracts queue name from a context, if any.
// //
// Return value qname indicates which queue the task was pulled from. // Return value queue indicates which queue the task was pulled from.
func GetQueueName(ctx context.Context) (qname string, ok bool) { func GetQueueName(ctx context.Context) (queue string, ok bool) {
return asynqcontext.GetQueueName(ctx) return asynqcontext.GetQueueName(ctx)
} }

View File

@@ -44,8 +44,8 @@ func (i *Inspector) Queues() ([]string, error) {
} }
// Groups returns a list of all groups within the given queue. // Groups returns a list of all groups within the given queue.
func (i *Inspector) Groups(qname string) ([]*GroupInfo, error) { func (i *Inspector) Groups(queue string) ([]*GroupInfo, error) {
stats, err := i.rdb.GroupStats(qname) stats, err := i.rdb.GroupStats(queue)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -122,11 +122,11 @@ type QueueInfo struct {
} }
// GetQueueInfo returns current information of the given queue. // GetQueueInfo returns current information of the given queue.
func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) { func (i *Inspector) GetQueueInfo(queue string) (*QueueInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, err return nil, err
} }
stats, err := i.rdb.CurrentStats(qname) stats, err := i.rdb.CurrentStats(queue)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -166,11 +166,11 @@ type DailyStats struct {
} }
// History returns a list of stats from the last n days. // History returns a list of stats from the last n days.
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { func (i *Inspector) History(queue string, n int) ([]*DailyStats, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, err return nil, err
} }
stats, err := i.rdb.HistoricalStats(qname, n) stats, err := i.rdb.HistoricalStats(queue, n)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -207,13 +207,13 @@ var (
// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound. // If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
// If force is set to false and the specified queue is not empty, DeleteQueue // If force is set to false and the specified queue is not empty, DeleteQueue
// returns ErrQueueNotEmpty. // returns ErrQueueNotEmpty.
func (i *Inspector) DeleteQueue(qname string, force bool) error { func (i *Inspector) DeleteQueue(queue string, force bool) error {
err := i.rdb.RemoveQueue(qname, force) err := i.rdb.RemoveQueue(queue, force)
if errors.IsQueueNotFound(err) { if errors.IsQueueNotFound(err) {
return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, qname) return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, queue)
} }
if errors.IsQueueNotEmpty(err) { if errors.IsQueueNotEmpty(err) {
return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, qname) return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, queue)
} }
return err return err
} }
@@ -222,8 +222,8 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// //
// Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist. // Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist.
// Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue. // Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue.
func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
info, err := i.rdb.GetTaskInfo(qname, id) info, err := i.rdb.GetTaskInfo(queue, id)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -299,13 +299,13 @@ func Page(n int) ListOption {
// ListPendingTasks retrieves pending tasks from the specified queue. // ListPendingTasks retrieves pending tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListPending(qname, pgn) infos, err := i.rdb.ListPending(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -327,20 +327,20 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI
// ListActiveTasks retrieves active tasks from the specified queue. // ListActiveTasks retrieves active tasks from the specified queue.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListActive(qname, pgn) infos, err := i.rdb.ListActive(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil: case err != nil:
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
expired, err := i.rdb.ListLeaseExpired(time.Now(), qname) expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil { if err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
@@ -367,13 +367,13 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
// ListAggregatingTasks retrieves scheduled tasks from the specified group. // ListAggregatingTasks retrieves scheduled tasks from the specified group.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListAggregating(qname, gname, pgn) infos, err := i.rdb.ListAggregating(queue, group, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -396,13 +396,13 @@ func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption
// Tasks are sorted by NextProcessAt in ascending order. // Tasks are sorted by NextProcessAt in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListScheduled(qname, pgn) infos, err := i.rdb.ListScheduled(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -425,13 +425,13 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
// Tasks are sorted by NextProcessAt in ascending order. // Tasks are sorted by NextProcessAt in ascending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListRetry(qname, pgn) infos, err := i.rdb.ListRetry(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -454,13 +454,13 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
// Tasks are sorted by LastFailedAt in descending order. // Tasks are sorted by LastFailedAt in descending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListArchived(qname, pgn) infos, err := i.rdb.ListArchived(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -483,13 +483,13 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
// Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order. // Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order.
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err) return nil, fmt.Errorf("asynq: %v", err)
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListCompleted(qname, pgn) infos, err := i.rdb.ListCompleted(queue, pgn)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -510,61 +510,61 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas
// DeleteAllPendingTasks deletes all pending tasks from the specified queue, // DeleteAllPendingTasks deletes all pending tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { func (i *Inspector) DeleteAllPendingTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllPendingTasks(qname) n, err := i.rdb.DeleteAllPendingTasks(queue)
return int(n), err return int(n), err
} }
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { func (i *Inspector) DeleteAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllScheduledTasks(qname) n, err := i.rdb.DeleteAllScheduledTasks(queue)
return int(n), err return int(n), err
} }
// DeleteAllRetryTasks deletes all retry tasks from the specified queue, // DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { func (i *Inspector) DeleteAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllRetryTasks(qname) n, err := i.rdb.DeleteAllRetryTasks(queue)
return int(n), err return int(n), err
} }
// DeleteAllArchivedTasks deletes all archived tasks from the specified queue, // DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { func (i *Inspector) DeleteAllArchivedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllArchivedTasks(qname) n, err := i.rdb.DeleteAllArchivedTasks(queue)
return int(n), err return int(n), err
} }
// DeleteAllCompletedTasks deletes all completed tasks from the specified queue, // DeleteAllCompletedTasks deletes all completed tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { func (i *Inspector) DeleteAllCompletedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllCompletedTasks(qname) n, err := i.rdb.DeleteAllCompletedTasks(queue)
return int(n), err return int(n), err
} }
// DeleteAllAggregatingTasks deletes all tasks from the specified group, // DeleteAllAggregatingTasks deletes all tasks from the specified group,
// and reports the number of tasks deleted. // and reports the number of tasks deleted.
func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) { func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname) n, err := i.rdb.DeleteAllAggregatingTasks(queue, group)
return int(n), err return int(n), err
} }
@@ -575,11 +575,11 @@ func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error)
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in active state, it returns a non-nil error. // If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(qname, id string) error { func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err) return fmt.Errorf("asynq: %v", err)
} }
err := i.rdb.DeleteTask(qname, id) err := i.rdb.DeleteTask(queue, id)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound) return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -594,41 +594,41 @@ func (i *Inspector) DeleteTask(qname, id string) error {
// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run, // RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
// and reports the number of tasks scheduled to run. // and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { func (i *Inspector) RunAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllScheduledTasks(qname) n, err := i.rdb.RunAllScheduledTasks(queue)
return int(n), err return int(n), err
} }
// RunAllRetryTasks schedules all retry tasks from the given queue to run, // RunAllRetryTasks schedules all retry tasks from the given queue to run,
// and reports the number of tasks scheduled to run. // and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { func (i *Inspector) RunAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllRetryTasks(qname) n, err := i.rdb.RunAllRetryTasks(queue)
return int(n), err return int(n), err
} }
// RunAllArchivedTasks schedules all archived tasks from the given queue to run, // RunAllArchivedTasks schedules all archived tasks from the given queue to run,
// and reports the number of tasks scheduled to run. // and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { func (i *Inspector) RunAllArchivedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllArchivedTasks(qname) n, err := i.rdb.RunAllArchivedTasks(queue)
return int(n), err return int(n), err
} }
// RunAllAggregatingTasks schedules all tasks from the given grou to run. // RunAllAggregatingTasks schedules all tasks from the given grou to run.
// and reports the number of tasks scheduled to run. // and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) { func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllAggregatingTasks(qname, gname) n, err := i.rdb.RunAllAggregatingTasks(queue, group)
return int(n), err return int(n), err
} }
@@ -639,11 +639,11 @@ func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) {
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in pending or active state, it returns a non-nil error. // If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(qname, id string) error { func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err) return fmt.Errorf("asynq: %v", err)
} }
err := i.rdb.RunTask(qname, id) err := i.rdb.RunTask(queue, id)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound) return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -657,41 +657,41 @@ func (i *Inspector) RunTask(qname, id string) error {
// ArchiveAllPendingTasks archives all pending tasks from the given queue, // ArchiveAllPendingTasks archives all pending tasks from the given queue,
// and reports the number of tasks archived. // and reports the number of tasks archived.
func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllPendingTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllPendingTasks(qname) n, err := i.rdb.ArchiveAllPendingTasks(queue)
return int(n), err return int(n), err
} }
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, // ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
// and reports the number of tasks archiveed. // and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllScheduledTasks(qname) n, err := i.rdb.ArchiveAllScheduledTasks(queue)
return int(n), err return int(n), err
} }
// ArchiveAllRetryTasks archives all retry tasks from the given queue, // ArchiveAllRetryTasks archives all retry tasks from the given queue,
// and reports the number of tasks archiveed. // and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllRetryTasks(qname) n, err := i.rdb.ArchiveAllRetryTasks(queue)
return int(n), err return int(n), err
} }
// ArchiveAllAggregatingTasks archives all tasks from the given group, // ArchiveAllAggregatingTasks archives all tasks from the given group,
// and reports the number of tasks archived. // and reports the number of tasks archived.
func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) { func (i *Inspector) ArchiveAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname) n, err := i.rdb.ArchiveAllAggregatingTasks(queue, group)
return int(n), err return int(n), err
} }
@@ -702,11 +702,11 @@ func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error)
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in already archived, it returns a non-nil error. // If the task is in already archived, it returns a non-nil error.
func (i *Inspector) ArchiveTask(qname, id string) error { func (i *Inspector) ArchiveTask(queue, id string) error {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: err") return fmt.Errorf("asynq: err")
} }
err := i.rdb.ArchiveTask(qname, id) err := i.rdb.ArchiveTask(queue, id)
switch { switch {
case errors.IsQueueNotFound(err): case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound) return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@@ -728,20 +728,20 @@ func (i *Inspector) CancelProcessing(id string) error {
// PauseQueue pauses task processing on the specified queue. // PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error. // If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error { func (i *Inspector) PauseQueue(queue string) error {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return err return err
} }
return i.rdb.Pause(qname) return i.rdb.Pause(queue)
} }
// UnpauseQueue resumes task processing on the specified queue. // UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error. // If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error { func (i *Inspector) UnpauseQueue(queue string) error {
if err := base.ValidateQueueName(qname); err != nil { if err := base.ValidateQueueName(queue); err != nil {
return err return err
} }
return i.rdb.Unpause(qname) return i.rdb.Unpause(queue)
} }
// Servers return a list of running servers' information. // Servers return a list of running servers' information.
@@ -831,8 +831,8 @@ type WorkerInfo struct {
} }
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (i *Inspector) ClusterKeySlot(qname string) (int64, error) { func (i *Inspector) ClusterKeySlot(queue string) (int64, error) {
return i.rdb.ClusterKeySlot(qname) return i.rdb.ClusterKeySlot(queue)
} }
// ClusterNode describes a node in redis cluster. // ClusterNode describes a node in redis cluster.
@@ -847,8 +847,8 @@ type ClusterNode struct {
// ClusterNodes returns a list of nodes the given queue belongs to. // ClusterNodes returns a list of nodes the given queue belongs to.
// //
// Only relevant if task queues are stored in redis cluster. // Only relevant if task queues are stored in redis cluster.
func (i *Inspector) ClusterNodes(qname string) ([]*ClusterNode, error) { func (i *Inspector) ClusterNodes(queue string) ([]*ClusterNode, error) {
nodes, err := i.rdb.ClusterNodes(qname) nodes, err := i.rdb.ClusterNodes(queue)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -916,11 +916,11 @@ func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s) fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn { switch fn {
case "Queue": case "Queue":
qname, err := strconv.Unquote(arg) queue, err := strconv.Unquote(arg)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return Queue(qname), nil return Queue(queue), nil
case "MaxRetry": case "MaxRetry":
n, err := strconv.Atoi(arg) n, err := strconv.Atoi(arg)
if err != nil { if err != nil {

View File

@@ -224,22 +224,22 @@ type Config struct {
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
type GroupAggregator interface { type GroupAggregator interface {
// Aggregate aggregates the given tasks which belong to a same group with the given groupKey // Aggregate aggregates the given tasks in a group with the given group name,
// and returns a new task which is the aggregation of those tasks. // and returns a new task which is the aggregation of those tasks.
// //
// Use NewTask(typename, payload, opts...) to set any options for the aggregated task. // Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
// Queue option will be ignored and the aggregated task will always be enqueued to the same queue // The Queue option, if provided, will be ignored and the aggregated task will always be enqueued
// the group belonged. // to the same queue the group belonged.
Aggregate(groupKey string, tasks []*Task) *Task Aggregate(group string, tasks []*Task) *Task
} }
// The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator. // The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.
// If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f. // If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f.
type GroupAggregatorFunc func(groupKey string, tasks []*Task) *Task type GroupAggregatorFunc func(group string, tasks []*Task) *Task
// Aggregate calls fn(groupKey, tasks) // Aggregate calls fn(group, tasks)
func (fn GroupAggregatorFunc) Aggregate(groupKey string, tasks []*Task) *Task { func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
return fn(groupKey, tasks) return fn(group, tasks)
} }
// An ErrorHandler handles an error occured during task processing. // An ErrorHandler handles an error occured during task processing.