diff --git a/aggregator.go b/aggregator.go index 9f8da70..e9a0d95 100644 --- a/aggregator.go +++ b/aggregator.go @@ -158,7 +158,12 @@ func (a *aggregator) aggregate(t time.Time) { for i, m := range msgs { tasks[i] = NewTask(m.Type, m.Payload) } - aggregatedTask := a.ga.Aggregate(gname, tasks) + aggregatedTask, err := a.ga.Aggregate(gname, tasks) + if err != nil { + a.logger.Errorf("Failed to aggregate tasks: queue=%q, group=%q, setID=%q: %v", + qname, gname, aggregationSetID, err) + continue + } ctx, cancel := context.WithDeadline(context.Background(), deadline) if _, err := a.client.EnqueueContext(ctx, aggregatedTask, Queue(qname)); err != nil { a.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v", diff --git a/aggregator_test.go b/aggregator_test.go index ccce306..3c42c94 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -26,7 +26,7 @@ func TestAggregator(t *testing.T) { gracePeriod time.Duration maxDelay time.Duration maxSize int - aggregateFunc func(gname string, tasks []*Task) *Task + aggregateFunc func(gname string, tasks []*Task) (*Task, error) tasks []*Task // tasks to enqueue enqueueFrequency time.Duration // time between one enqueue event to another waitTime time.Duration // time to wait @@ -38,8 +38,8 @@ func TestAggregator(t *testing.T) { gracePeriod: 1 * time.Second, maxDelay: 0, // no maxdelay limit maxSize: 0, // no maxsize limit - aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + aggregateFunc: func(gname string, tasks []*Task) (*Task, error) { + return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated }, tasks: []*Task{ NewTask("task1", nil, Group("mygroup")), @@ -64,8 +64,8 @@ func TestAggregator(t *testing.T) { gracePeriod: 2 * time.Second, maxDelay: 4 * time.Second, maxSize: 0, // no maxsize limit - aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + aggregateFunc: func(gname string, tasks []*Task) (*Task, error) { + return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated }, tasks: []*Task{ NewTask("task1", nil, Group("mygroup")), // time 0 @@ -91,8 +91,8 @@ func TestAggregator(t *testing.T) { gracePeriod: 1 * time.Minute, maxDelay: 0, // no maxdelay limit maxSize: 5, - aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + aggregateFunc: func(gname string, tasks []*Task) (*Task, error) { + return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated }, tasks: []*Task{ NewTask("task1", nil, Group("mygroup")), diff --git a/server.go b/server.go index 1b98a95..cfe1ca5 100644 --- a/server.go +++ b/server.go @@ -239,15 +239,15 @@ type GroupAggregator interface { // Use NewTask(typename, payload, opts...) to set any options for the aggregated task. // The Queue option, if provided, will be ignored and the aggregated task will always be enqueued // to the same queue the group belonged. - Aggregate(group string, tasks []*Task) *Task + Aggregate(group string, tasks []*Task) (*Task, error) } // The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator. // If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f. -type GroupAggregatorFunc func(group string, tasks []*Task) *Task +type GroupAggregatorFunc func(group string, tasks []*Task) (*Task, error) // Aggregate calls fn(group, tasks) -func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task { +func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) (*Task, error) { return fn(group, tasks) }