2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Always enqueue the aggregated task in the same queue

This commit is contained in:
Ken Hibino 2022-04-07 06:19:04 -07:00
parent 829f64fd38
commit 39718f8bea
2 changed files with 6 additions and 2 deletions

View File

@ -160,7 +160,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
if _, err := a.client.EnqueueContext(ctx, aggregatedTask); err != nil {
if _, err := a.client.EnqueueContext(ctx, aggregatedTask, Queue(qname)); err != nil {
a.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v",
qname, gname, aggregationSetID, err)
cancel()

View File

@ -224,8 +224,12 @@ type Config struct {
// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
type GroupAggregator interface {
// Aggregate aggregates the given tasks which belong to a same group
// Aggregate aggregates the given tasks which belong to a same group with the given groupKey
// and returns a new task which is the aggregation of those tasks.
//
// Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
// Queue option will be ignored and the aggregated task will always be enqueued to the same queue
// the group belonged.
Aggregate(groupKey string, tasks []*Task) *Task
}