diff --git a/client.go b/client.go index b531bb8..dc5c3d1 100644 --- a/client.go +++ b/client.go @@ -93,13 +93,13 @@ func (n retryOption) Type() OptionType { return MaxRetryOpt } func (n retryOption) Value() interface{} { return int(n) } // Queue returns an option to specify the queue to enqueue the task into. -func Queue(qname string) Option { - return queueOption(qname) +func Queue(name string) Option { + return queueOption(name) } -func (qname queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(qname)) } -func (qname queueOption) Type() OptionType { return QueueOpt } -func (qname queueOption) Value() interface{} { return string(qname) } +func (name queueOption) String() string { return fmt.Sprintf("Queue(%q)", string(name)) } +func (name queueOption) Type() OptionType { return QueueOpt } +func (name queueOption) Value() interface{} { return string(name) } // TaskID returns an option to specify the task ID. func TaskID(id string) Option { @@ -196,17 +196,15 @@ func (ttl retentionOption) String() string { return fmt.Sprintf("Retention(% func (ttl retentionOption) Type() OptionType { return RetentionOpt } func (ttl retentionOption) Value() interface{} { return time.Duration(ttl) } -// Group returns an option to specify the group key used for the task. -// Tasks in a given queue with the same group key will be aggregated into one task before passed to Handler. -// -// To customize the aggregation and grouping policy, specify the Group* fields in Config. -func Group(key string) Option { - return groupOption(key) +// Group returns an option to specify the group used for the task. +// Tasks in a given queue with the same group will be aggregated into one task before passed to Handler. +func Group(name string) Option { + return groupOption(name) } -func (key groupOption) String() string { return fmt.Sprintf("Group(%q)", string(key)) } -func (key groupOption) Type() OptionType { return GroupOpt } -func (key groupOption) Value() interface{} { return string(key) } +func (name groupOption) String() string { return fmt.Sprintf("Group(%q)", string(name)) } +func (name groupOption) Type() OptionType { return GroupOpt } +func (name groupOption) Value() interface{} { return string(name) } // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // @@ -227,7 +225,7 @@ type option struct { uniqueTTL time.Duration processAt time.Time retention time.Duration - groupKey string + group string } // composeOptions merges user provided options into the default options @@ -280,7 +278,7 @@ func composeOptions(opts ...Option) (option, error) { if isBlank(key) { return option{}, errors.New("group key cannot be empty") } - res.groupKey = key + res.group = key default: // ignore unexpected option } @@ -378,7 +376,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) Deadline: deadline.Unix(), Timeout: int64(timeout.Seconds()), UniqueKey: uniqueKey, - GroupKey: opt.groupKey, + GroupKey: opt.group, Retention: int64(opt.retention.Seconds()), } now := time.Now() @@ -386,10 +384,10 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) if opt.processAt.After(now) { err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL) state = base.TaskStateScheduled - } else if opt.groupKey != "" { + } else if opt.group != "" { // Use zero value for processAt since we don't know when the task will be aggregated and processed. opt.processAt = time.Time{} - err = c.addToGroup(ctx, msg, opt.groupKey, opt.uniqueTTL) + err = c.addToGroup(ctx, msg, opt.group, opt.uniqueTTL) state = base.TaskStateAggregating } else { opt.processAt = now @@ -422,9 +420,9 @@ func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Tim return c.broker.Schedule(ctx, msg, t) } -func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error { +func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, group string, uniqueTTL time.Duration) error { if uniqueTTL > 0 { - return c.broker.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL) + return c.broker.AddToGroupUnique(ctx, msg, group, uniqueTTL) } - return c.broker.AddToGroup(ctx, msg, groupKey) + return c.broker.AddToGroup(ctx, msg, group) } diff --git a/context.go b/context.go index 6eac87d..f7ccf04 100644 --- a/context.go +++ b/context.go @@ -36,7 +36,7 @@ func GetMaxRetry(ctx context.Context) (n int, ok bool) { // GetQueueName extracts queue name from a context, if any. // -// Return value qname indicates which queue the task was pulled from. -func GetQueueName(ctx context.Context) (qname string, ok bool) { +// Return value queue indicates which queue the task was pulled from. +func GetQueueName(ctx context.Context) (queue string, ok bool) { return asynqcontext.GetQueueName(ctx) } diff --git a/inspector.go b/inspector.go index 56e2d63..2ae6c52 100644 --- a/inspector.go +++ b/inspector.go @@ -44,8 +44,8 @@ func (i *Inspector) Queues() ([]string, error) { } // Groups returns a list of all groups within the given queue. -func (i *Inspector) Groups(qname string) ([]*GroupInfo, error) { - stats, err := i.rdb.GroupStats(qname) +func (i *Inspector) Groups(queue string) ([]*GroupInfo, error) { + stats, err := i.rdb.GroupStats(queue) if err != nil { return nil, err } @@ -122,11 +122,11 @@ type QueueInfo struct { } // GetQueueInfo returns current information of the given queue. -func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) GetQueueInfo(queue string) (*QueueInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, err } - stats, err := i.rdb.CurrentStats(qname) + stats, err := i.rdb.CurrentStats(queue) if err != nil { return nil, err } @@ -166,11 +166,11 @@ type DailyStats struct { } // History returns a list of stats from the last n days. -func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) History(queue string, n int) ([]*DailyStats, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, err } - stats, err := i.rdb.HistoricalStats(qname, n) + stats, err := i.rdb.HistoricalStats(queue, n) if err != nil { return nil, err } @@ -207,13 +207,13 @@ var ( // If the specified queue does not exist, DeleteQueue returns ErrQueueNotFound. // If force is set to false and the specified queue is not empty, DeleteQueue // returns ErrQueueNotEmpty. -func (i *Inspector) DeleteQueue(qname string, force bool) error { - err := i.rdb.RemoveQueue(qname, force) +func (i *Inspector) DeleteQueue(queue string, force bool) error { + err := i.rdb.RemoveQueue(queue, force) if errors.IsQueueNotFound(err) { - return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, qname) + return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, queue) } if errors.IsQueueNotEmpty(err) { - return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, qname) + return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, queue) } return err } @@ -222,8 +222,8 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { // // Returns an error wrapping ErrQueueNotFound if a queue with the given name doesn't exist. // Returns an error wrapping ErrTaskNotFound if a task with the given id doesn't exist in the queue. -func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { - info, err := i.rdb.GetTaskInfo(qname, id) +func (i *Inspector) GetTaskInfo(queue, id string) (*TaskInfo, error) { + info, err := i.rdb.GetTaskInfo(queue, id) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -299,13 +299,13 @@ func Page(n int) ListOption { // ListPendingTasks retrieves pending tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListPendingTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListPending(qname, pgn) + infos, err := i.rdb.ListPending(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -327,20 +327,20 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI // ListActiveTasks retrieves active tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListActiveTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListActive(qname, pgn) + infos, err := i.rdb.ListActive(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) case err != nil: return nil, fmt.Errorf("asynq: %v", err) } - expired, err := i.rdb.ListLeaseExpired(time.Now(), qname) + expired, err := i.rdb.ListLeaseExpired(time.Now(), queue) if err != nil { return nil, fmt.Errorf("asynq: %v", err) } @@ -367,13 +367,13 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn // ListAggregatingTasks retrieves scheduled tasks from the specified group. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListAggregatingTasks(queue, group string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListAggregating(qname, gname, pgn) + infos, err := i.rdb.ListAggregating(queue, group, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -396,13 +396,13 @@ func (i *Inspector) ListAggregatingTasks(qname, gname string, opts ...ListOption // Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListScheduledTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListScheduled(qname, pgn) + infos, err := i.rdb.ListScheduled(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -425,13 +425,13 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas // Tasks are sorted by NextProcessAt in ascending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListRetryTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListRetry(qname, pgn) + infos, err := i.rdb.ListRetry(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -454,13 +454,13 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf // Tasks are sorted by LastFailedAt in descending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListArchivedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListArchived(qname, pgn) + infos, err := i.rdb.ListArchived(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -483,13 +483,13 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task // Tasks are sorted by expiration time (i.e. CompletedAt + Retention) in descending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*TaskInfo, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ListCompletedTasks(queue string, opts ...ListOption) ([]*TaskInfo, error) { + if err := base.ValidateQueueName(queue); err != nil { return nil, fmt.Errorf("asynq: %v", err) } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - infos, err := i.rdb.ListCompleted(qname, pgn) + infos, err := i.rdb.ListCompleted(queue, pgn) switch { case errors.IsQueueNotFound(err): return nil, fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -510,61 +510,61 @@ func (i *Inspector) ListCompletedTasks(qname string, opts ...ListOption) ([]*Tas // DeleteAllPendingTasks deletes all pending tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllPendingTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllPendingTasks(qname) + n, err := i.rdb.DeleteAllPendingTasks(queue) return int(n), err } // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllScheduledTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllScheduledTasks(qname) + n, err := i.rdb.DeleteAllScheduledTasks(queue) return int(n), err } // DeleteAllRetryTasks deletes all retry tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllRetryTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllRetryTasks(qname) + n, err := i.rdb.DeleteAllRetryTasks(queue) return int(n), err } // DeleteAllArchivedTasks deletes all archived tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllArchivedTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllArchivedTasks(qname) + n, err := i.rdb.DeleteAllArchivedTasks(queue) return int(n), err } // DeleteAllCompletedTasks deletes all completed tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllCompletedTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllCompletedTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllCompletedTasks(qname) + n, err := i.rdb.DeleteAllCompletedTasks(queue) return int(n), err } // DeleteAllAggregatingTasks deletes all tasks from the specified group, // and reports the number of tasks deleted. -func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteAllAggregatingTasks(queue, group string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.DeleteAllAggregatingTasks(qname, gname) + n, err := i.rdb.DeleteAllAggregatingTasks(queue, group) return int(n), err } @@ -575,11 +575,11 @@ func (i *Inspector) DeleteAllAggregatingTasks(qname, gname string) (int, error) // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If the task is in active state, it returns a non-nil error. -func (i *Inspector) DeleteTask(qname, id string) error { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) DeleteTask(queue, id string) error { + if err := base.ValidateQueueName(queue); err != nil { return fmt.Errorf("asynq: %v", err) } - err := i.rdb.DeleteTask(qname, id) + err := i.rdb.DeleteTask(queue, id) switch { case errors.IsQueueNotFound(err): return fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -594,41 +594,41 @@ func (i *Inspector) DeleteTask(qname, id string) error { // RunAllScheduledTasks schedules all scheduled tasks from the given queue to run, // and reports the number of tasks scheduled to run. -func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) RunAllScheduledTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.RunAllScheduledTasks(qname) + n, err := i.rdb.RunAllScheduledTasks(queue) return int(n), err } // RunAllRetryTasks schedules all retry tasks from the given queue to run, // and reports the number of tasks scheduled to run. -func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) RunAllRetryTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.RunAllRetryTasks(qname) + n, err := i.rdb.RunAllRetryTasks(queue) return int(n), err } // RunAllArchivedTasks schedules all archived tasks from the given queue to run, // and reports the number of tasks scheduled to run. -func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) RunAllArchivedTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.RunAllArchivedTasks(qname) + n, err := i.rdb.RunAllArchivedTasks(queue) return int(n), err } // RunAllAggregatingTasks schedules all tasks from the given grou to run. // and reports the number of tasks scheduled to run. -func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) RunAllAggregatingTasks(queue, group string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.RunAllAggregatingTasks(qname, gname) + n, err := i.rdb.RunAllAggregatingTasks(queue, group) return int(n), err } @@ -639,11 +639,11 @@ func (i *Inspector) RunAllAggregatingTasks(qname, gname string) (int, error) { // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If the task is in pending or active state, it returns a non-nil error. -func (i *Inspector) RunTask(qname, id string) error { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) RunTask(queue, id string) error { + if err := base.ValidateQueueName(queue); err != nil { return fmt.Errorf("asynq: %v", err) } - err := i.rdb.RunTask(qname, id) + err := i.rdb.RunTask(queue, id) switch { case errors.IsQueueNotFound(err): return fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -657,41 +657,41 @@ func (i *Inspector) RunTask(qname, id string) error { // ArchiveAllPendingTasks archives all pending tasks from the given queue, // and reports the number of tasks archived. -func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ArchiveAllPendingTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.ArchiveAllPendingTasks(qname) + n, err := i.rdb.ArchiveAllPendingTasks(queue) return int(n), err } // ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, // and reports the number of tasks archiveed. -func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ArchiveAllScheduledTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.ArchiveAllScheduledTasks(qname) + n, err := i.rdb.ArchiveAllScheduledTasks(queue) return int(n), err } // ArchiveAllRetryTasks archives all retry tasks from the given queue, // and reports the number of tasks archiveed. -func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ArchiveAllRetryTasks(queue string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.ArchiveAllRetryTasks(qname) + n, err := i.rdb.ArchiveAllRetryTasks(queue) return int(n), err } // ArchiveAllAggregatingTasks archives all tasks from the given group, // and reports the number of tasks archived. -func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ArchiveAllAggregatingTasks(queue, group string) (int, error) { + if err := base.ValidateQueueName(queue); err != nil { return 0, err } - n, err := i.rdb.ArchiveAllAggregatingTasks(qname, gname) + n, err := i.rdb.ArchiveAllAggregatingTasks(queue, group) return int(n), err } @@ -702,11 +702,11 @@ func (i *Inspector) ArchiveAllAggregatingTasks(qname, gname string) (int, error) // If a queue with the given name doesn't exist, it returns an error wrapping ErrQueueNotFound. // If a task with the given id doesn't exist in the queue, it returns an error wrapping ErrTaskNotFound. // If the task is in already archived, it returns a non-nil error. -func (i *Inspector) ArchiveTask(qname, id string) error { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) ArchiveTask(queue, id string) error { + if err := base.ValidateQueueName(queue); err != nil { return fmt.Errorf("asynq: err") } - err := i.rdb.ArchiveTask(qname, id) + err := i.rdb.ArchiveTask(queue, id) switch { case errors.IsQueueNotFound(err): return fmt.Errorf("asynq: %w", ErrQueueNotFound) @@ -728,20 +728,20 @@ func (i *Inspector) CancelProcessing(id string) error { // PauseQueue pauses task processing on the specified queue. // If the queue is already paused, it will return a non-nil error. -func (i *Inspector) PauseQueue(qname string) error { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) PauseQueue(queue string) error { + if err := base.ValidateQueueName(queue); err != nil { return err } - return i.rdb.Pause(qname) + return i.rdb.Pause(queue) } // UnpauseQueue resumes task processing on the specified queue. // If the queue is not paused, it will return a non-nil error. -func (i *Inspector) UnpauseQueue(qname string) error { - if err := base.ValidateQueueName(qname); err != nil { +func (i *Inspector) UnpauseQueue(queue string) error { + if err := base.ValidateQueueName(queue); err != nil { return err } - return i.rdb.Unpause(qname) + return i.rdb.Unpause(queue) } // Servers return a list of running servers' information. @@ -831,8 +831,8 @@ type WorkerInfo struct { } // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. -func (i *Inspector) ClusterKeySlot(qname string) (int64, error) { - return i.rdb.ClusterKeySlot(qname) +func (i *Inspector) ClusterKeySlot(queue string) (int64, error) { + return i.rdb.ClusterKeySlot(queue) } // ClusterNode describes a node in redis cluster. @@ -847,8 +847,8 @@ type ClusterNode struct { // ClusterNodes returns a list of nodes the given queue belongs to. // // Only relevant if task queues are stored in redis cluster. -func (i *Inspector) ClusterNodes(qname string) ([]*ClusterNode, error) { - nodes, err := i.rdb.ClusterNodes(qname) +func (i *Inspector) ClusterNodes(queue string) ([]*ClusterNode, error) { + nodes, err := i.rdb.ClusterNodes(queue) if err != nil { return nil, err } @@ -916,11 +916,11 @@ func parseOption(s string) (Option, error) { fn, arg := parseOptionFunc(s), parseOptionArg(s) switch fn { case "Queue": - qname, err := strconv.Unquote(arg) + queue, err := strconv.Unquote(arg) if err != nil { return nil, err } - return Queue(qname), nil + return Queue(queue), nil case "MaxRetry": n, err := strconv.Atoi(arg) if err != nil { diff --git a/server.go b/server.go index bbe5d1f..c8bcbc2 100644 --- a/server.go +++ b/server.go @@ -224,22 +224,22 @@ type Config struct { // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. type GroupAggregator interface { - // Aggregate aggregates the given tasks which belong to a same group with the given groupKey + // Aggregate aggregates the given tasks in a group with the given group name, // and returns a new task which is the aggregation of those tasks. // // Use NewTask(typename, payload, opts...) to set any options for the aggregated task. - // Queue option will be ignored and the aggregated task will always be enqueued to the same queue - // the group belonged. - Aggregate(groupKey string, tasks []*Task) *Task + // The Queue option, if provided, will be ignored and the aggregated task will always be enqueued + // to the same queue the group belonged. + Aggregate(group string, tasks []*Task) *Task } // The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator. // If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f. -type GroupAggregatorFunc func(groupKey string, tasks []*Task) *Task +type GroupAggregatorFunc func(group string, tasks []*Task) *Task -// Aggregate calls fn(groupKey, tasks) -func (fn GroupAggregatorFunc) Aggregate(groupKey string, tasks []*Task) *Task { - return fn(groupKey, tasks) +// Aggregate calls fn(group, tasks) +func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task { + return fn(group, tasks) } // An ErrorHandler handles an error occured during task processing.