2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Rename variables for public API documentation

This commit is contained in:
Ken Hibino 2022-04-10 17:55:21 -07:00
parent 451be7e50f
commit 8b2a787759
4 changed files with 129 additions and 131 deletions

View File

@ -93,13 +93,13 @@ func (n retryOption) Type() OptionType { return MaxRetryOpt }
func (n retryOption) Value() interface{} { return int(n) }
// Queue returns an option to specify the queue to enqueue the task into.
func Queue(qname string) Option {
return queueOption(qname)
func Queue(name string) Option {
return queueOption(name)
}
func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) }
func (qname queueOption) Type() OptionType { return QueueOpt }
func (qname queueOption) Value() interface{} { return string(qname) }
func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) }
func (name queueOption) Type() OptionType { return QueueOpt }
func (name queueOption) Value() interface{} { return string(name) }
// TaskID returns an option to specify the task ID.
func TaskID(id string) Option {
@ -196,17 +196,15 @@ func (ttl retentionOption) String() string { return fmt.Sprintf("Retention(%
func (ttl retentionOption) Type() OptionType { return RetentionOpt }
func (ttl retentionOption) Value() interface{} { return time.Duration(ttl) }
// Group returns an option to specify the group key used for the task.
// Tasks in a given queue with the same group key will be aggregated into one task before passed to Handler.
//
// To customize the aggregation and grouping policy, specify the Group* fields in Config.
func Group(key string) Option {
return groupOption(key)
// Group returns an option to specify the group used for the task.
// Tasks in a given queue with the same group will be aggregated into one task before passed to Handler.
func Group(name string) Option {
return groupOption(name)
}
func (key groupOption) String() string { return fmt.Sprintf("Group(%q)", string(key)) }
func (key groupOption) Type() OptionType { return GroupOpt }
func (key groupOption) Value() interface{} { return string(key) }
func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", string(name)) }
func (name groupOption) Type() OptionType { return GroupOpt }
func (name groupOption) Value() interface{} { return string(name) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
@ -227,7 +225,7 @@ type option struct {
uniqueTTL time.Duration
processAt time.Time
retention time.Duration
groupKey string
group string
}
// composeOptions merges user provided options into the default options
@ -280,7 +278,7 @@ func composeOptions(opts ...Option) (option, error) {
if isBlank(key) {
return option{}, errors.New("group key cannot be empty")
}
res.groupKey = key
res.group = key
default:
// ignore unexpected option
}
@ -378,7 +376,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey,
GroupKey: opt.groupKey,
GroupKey: opt.group,
Retention: int64(opt.retention.Seconds()),
}
now := time.Now()
@ -386,10 +384,10 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
if opt.processAt.After(now) {
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
state = base.TaskStateScheduled
} else if opt.groupKey != "" {
} else if opt.group != "" {
// Use zero value for processAt since we don't know when the task will be aggregated and processed.
opt.processAt = time.Time{}
err = c.addToGroup(ctx, msg, opt.groupKey, opt.uniqueTTL)
err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL)
state = base.TaskStateAggregating
} else {
opt.processAt = now
@ -422,9 +420,9 @@ func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Tim
return c.broker.Schedule(ctx, msg, t)
}
func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error {
func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, group string, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
return c.broker.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL)
return c.broker.AddToGroupUnique(ctx, msg, group, uniqueTTL)
}
return c.broker.AddToGroup(ctx, msg, groupKey)
return c.broker.AddToGroup(ctx, msg, group)
}

View File

@ -36,7 +36,7 @@ func GetMaxRetry(ctx context.Context) (n int, ok bool) {
// GetQueueName extracts queue name from a context, if any.
//
// Return value qname indicates which queue the task was pulled from.
func GetQueueName(ctx context.Context) (qname string, ok bool) {
// Return value queue indicates which queue the task was pulled from.
func GetQueueName(ctx context.Context) (queue string, ok bool) {
return asynqcontext.GetQueueName(ctx)
}

View File

@ -44,8 +44,8 @@ func (i *Inspector) Queues() ([]string, error) {
}
// Groups returns a list of all groups within the given queue.
func (i *Inspector) Groups(qname string) ([]*GroupInfo, error) {
stats, err := i.rdb.GroupStats(qname)
func (i *Inspector) Groups(queue string) ([]*GroupInfo, error) {
stats, err := i.rdb.GroupStats(queue)
if err != nil {
return nil, err
}
@ -122,11 +122,11 @@ type QueueInfo struct {
}
// GetQueueInfo returns current information of the given queue.
func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) GetQueueInfo(queue string) (*QueueInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, err
}
stats, err := i.rdb.CurrentStats(qname)
stats, err := i.rdb.CurrentStats(queue)
if err != nil {
return nil, err
}
@ -166,11 +166,11 @@ type DailyStats struct {
}
// History returns a list of stats from the last n days.
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) History(queue string, n int) ([]*DailyStats, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, err
}
stats, err := i.rdb.HistoricalStats(qname, n)
stats, err := i.rdb.HistoricalStats(queue, n)
if err != nil {
return nil, err
}
@ -207,13 +207,13 @@ var (
// If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound.
// If force is set to false and the specified queue is not empty, DeleteQueue
// returns ErrQueueNotEmpty.
func (i *Inspector) DeleteQueue(qname string, force bool) error {
err := i.rdb.RemoveQueue(qname, force)
func (i *Inspector) DeleteQueue(queue string, force bool) error {
err := i.rdb.RemoveQueue(queue, force)
if errors.IsQueueNotFound(err) {
return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, qname)
return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, queue)
}
if errors.IsQueueNotEmpty(err) {
return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, qname)
return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, queue)
}
return err
}
@ -222,8 +222,8 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
//
// Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist.
// Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue.
func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) {
info, err := i.rdb.GetTaskInfo(qname, id)
func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) {
info, err := i.rdb.GetTaskInfo(queue, id)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -299,13 +299,13 @@ func Page(n int) ListOption {
// ListPendingTasks retrieves pending tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListPending(qname, pgn)
infos, err := i.rdb.ListPending(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -327,20 +327,20 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI
// ListActiveTasks retrieves active tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListActive(qname, pgn)
infos, err := i.rdb.ListActive(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
expired, err := i.rdb.ListLeaseExpired(time.Now(), qname)
expired, err := i.rdb.ListLeaseExpired(time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
@ -367,13 +367,13 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
// ListAggregatingTasks retrieves scheduled tasks from the specified group.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListAggregating(qname, gname, pgn)
infos, err := i.rdb.ListAggregating(queue, group, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -396,13 +396,13 @@ func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListScheduled(qname, pgn)
infos, err := i.rdb.ListScheduled(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -425,13 +425,13 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas
// Tasks are sorted by NextProcessAt in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListRetry(qname, pgn)
infos, err := i.rdb.ListRetry(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -454,13 +454,13 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf
// Tasks are sorted by LastFailedAt in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListArchived(qname, pgn)
infos, err := i.rdb.ListArchived(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -483,13 +483,13 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task
// Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) {
if err := base.ValidateQueueName(queue); err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
infos, err := i.rdb.ListCompleted(qname, pgn)
infos, err := i.rdb.ListCompleted(queue, pgn)
switch {
case errors.IsQueueNotFound(err):
return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -510,61 +510,61 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas
// DeleteAllPendingTasks deletes all pending tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllPendingTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllPendingTasks(qname)
n, err := i.rdb.DeleteAllPendingTasks(queue)
return int(n), err
}
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllScheduledTasks(qname)
n, err := i.rdb.DeleteAllScheduledTasks(queue)
return int(n), err
}
// DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllRetryTasks(qname)
n, err := i.rdb.DeleteAllRetryTasks(queue)
return int(n), err
}
// DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllArchivedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllArchivedTasks(qname)
n, err := i.rdb.DeleteAllArchivedTasks(queue)
return int(n), err
}
// DeleteAllCompletedTasks deletes all completed tasks from the specified queue,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllCompletedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllCompletedTasks(qname)
n, err := i.rdb.DeleteAllCompletedTasks(queue)
return int(n), err
}
// DeleteAllAggregatingTasks deletes all tasks from the specified group,
// and reports the number of tasks deleted.
func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname)
n, err := i.rdb.DeleteAllAggregatingTasks(queue, group)
return int(n), err
}
@ -575,11 +575,11 @@ func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error)
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in active state, it returns a non-nil error.
func (i *Inspector) DeleteTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) DeleteTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.DeleteTask(qname, id)
err := i.rdb.DeleteTask(queue, id)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -594,41 +594,41 @@ func (i *Inspector) DeleteTask(qname, id string) error {
// RunAllScheduledTasks schedules all scheduled tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) RunAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.RunAllScheduledTasks(qname)
n, err := i.rdb.RunAllScheduledTasks(queue)
return int(n), err
}
// RunAllRetryTasks schedules all retry tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) RunAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.RunAllRetryTasks(qname)
n, err := i.rdb.RunAllRetryTasks(queue)
return int(n), err
}
// RunAllArchivedTasks schedules all archived tasks from the given queue to run,
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) RunAllArchivedTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.RunAllArchivedTasks(qname)
n, err := i.rdb.RunAllArchivedTasks(queue)
return int(n), err
}
// RunAllAggregatingTasks schedules all tasks from the given grou to run.
// and reports the number of tasks scheduled to run.
func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.RunAllAggregatingTasks(qname, gname)
n, err := i.rdb.RunAllAggregatingTasks(queue, group)
return int(n), err
}
@ -639,11 +639,11 @@ func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) {
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in pending or active state, it returns a non-nil error.
func (i *Inspector) RunTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) RunTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: %v", err)
}
err := i.rdb.RunTask(qname, id)
err := i.rdb.RunTask(queue, id)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -657,41 +657,41 @@ func (i *Inspector) RunTask(qname, id string) error {
// ArchiveAllPendingTasks archives all pending tasks from the given queue,
// and reports the number of tasks archived.
func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ArchiveAllPendingTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllPendingTasks(qname)
n, err := i.rdb.ArchiveAllPendingTasks(queue)
return int(n), err
}
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
// and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ArchiveAllScheduledTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllScheduledTasks(qname)
n, err := i.rdb.ArchiveAllScheduledTasks(queue)
return int(n), err
}
// ArchiveAllRetryTasks archives all retry tasks from the given queue,
// and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ArchiveAllRetryTasks(queue string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllRetryTasks(qname)
n, err := i.rdb.ArchiveAllRetryTasks(queue)
return int(n), err
}
// ArchiveAllAggregatingTasks archives all tasks from the given group,
// and reports the number of tasks archived.
func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ArchiveAllAggregatingTasks(queue, group string) (int, error) {
if err := base.ValidateQueueName(queue); err != nil {
return 0, err
}
n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname)
n, err := i.rdb.ArchiveAllAggregatingTasks(queue, group)
return int(n), err
}
@ -702,11 +702,11 @@ func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error)
// If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound.
// If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound.
// If the task is in already archived, it returns a non-nil error.
func (i *Inspector) ArchiveTask(qname, id string) error {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) ArchiveTask(queue, id string) error {
if err := base.ValidateQueueName(queue); err != nil {
return fmt.Errorf("asynq: err")
}
err := i.rdb.ArchiveTask(qname, id)
err := i.rdb.ArchiveTask(queue, id)
switch {
case errors.IsQueueNotFound(err):
return fmt.Errorf("asynq: %w", ErrQueueNotFound)
@ -728,20 +728,20 @@ func (i *Inspector) CancelProcessing(id string) error {
// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) PauseQueue(queue string) error {
if err := base.ValidateQueueName(queue); err != nil {
return err
}
return i.rdb.Pause(qname)
return i.rdb.Pause(queue)
}
// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error {
if err := base.ValidateQueueName(qname); err != nil {
func (i *Inspector) UnpauseQueue(queue string) error {
if err := base.ValidateQueueName(queue); err != nil {
return err
}
return i.rdb.Unpause(qname)
return i.rdb.Unpause(queue)
}
// Servers return a list of running servers' information.
@ -831,8 +831,8 @@ type WorkerInfo struct {
}
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (i *Inspector) ClusterKeySlot(qname string) (int64, error) {
return i.rdb.ClusterKeySlot(qname)
func (i *Inspector) ClusterKeySlot(queue string) (int64, error) {
return i.rdb.ClusterKeySlot(queue)
}
// ClusterNode describes a node in redis cluster.
@ -847,8 +847,8 @@ type ClusterNode struct {
// ClusterNodes returns a list of nodes the given queue belongs to.
//
// Only relevant if task queues are stored in redis cluster.
func (i *Inspector) ClusterNodes(qname string) ([]*ClusterNode, error) {
nodes, err := i.rdb.ClusterNodes(qname)
func (i *Inspector) ClusterNodes(queue string) ([]*ClusterNode, error) {
nodes, err := i.rdb.ClusterNodes(queue)
if err != nil {
return nil, err
}
@ -916,11 +916,11 @@ func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
queue, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return Queue(qname), nil
return Queue(queue), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {

View File

@ -224,22 +224,22 @@ type Config struct {
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
type GroupAggregator interface {
// Aggregate aggregates the given tasks which belong to a same group with the given groupKey
// Aggregate aggregates the given tasks in a group with the given group name,
// and returns a new task which is the aggregation of those tasks.
//
// Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
// Queue option will be ignored and the aggregated task will always be enqueued to the same queue
// the group belonged.
Aggregate(groupKey string, tasks []*Task) *Task
// The Queue option, if provided, will be ignored and the aggregated task will always be enqueued
// to the same queue the group belonged.
Aggregate(group string, tasks []*Task) *Task
}
// The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.
// If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f.
type GroupAggregatorFunc func(groupKey string, tasks []*Task) *Task
type GroupAggregatorFunc func(group string, tasks []*Task) *Task
// Aggregate calls fn(groupKey, tasks)
func (fn GroupAggregatorFunc) Aggregate(groupKey string, tasks []*Task) *Task {
return fn(groupKey, tasks)
// Aggregate calls fn(group, tasks)
func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
return fn(group, tasks)
}
// An ErrorHandler handles an error occured during task processing.