2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-19 23:30:20 +08:00

Return error from Aggregate func

This commit is contained in:
Hubert Krauze 2023-07-10 23:02:06 +02:00
parent 7f457a9492
commit 53af8dd9ea
No known key found for this signature in database
GPG Key ID: C2D5DA8096214E43
3 changed files with 16 additions and 11 deletions

View File

@ -158,7 +158,12 @@ func (a *aggregator) aggregate(t time.Time) {
for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload)
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
aggregatedTask, err := a.ga.Aggregate(gname, tasks)
if err != nil {
a.logger.Errorf("Failed to aggregate tasks: queue=%q, group=%q, setID=%q: %v",
qname, gname, aggregationSetID, err)
continue
}
ctx, cancel := context.WithDeadline(context.Background(), deadline)
if _, err := a.client.EnqueueContext(ctx, aggregatedTask, Queue(qname)); err != nil {
a.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v",

View File

@ -26,7 +26,7 @@ func TestAggregator(t *testing.T) {
gracePeriod time.Duration
maxDelay time.Duration
maxSize int
aggregateFunc func(gname string, tasks []*Task) *Task
aggregateFunc func(gname string, tasks []*Task) (*Task, error)
tasks []*Task // tasks to enqueue
enqueueFrequency time.Duration // time between one enqueue event to another
waitTime time.Duration // time to wait
@ -38,8 +38,8 @@ func TestAggregator(t *testing.T) {
gracePeriod: 1 * time.Second,
maxDelay: 0, // no maxdelay limit
maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
aggregateFunc: func(gname string, tasks []*Task) (*Task, error) {
return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")),
@ -64,8 +64,8 @@ func TestAggregator(t *testing.T) {
gracePeriod: 2 * time.Second,
maxDelay: 4 * time.Second,
maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
aggregateFunc: func(gname string, tasks []*Task) (*Task, error) {
return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")), // time 0
@ -91,8 +91,8 @@ func TestAggregator(t *testing.T) {
gracePeriod: 1 * time.Minute,
maxDelay: 0, // no maxdelay limit
maxSize: 5,
aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
aggregateFunc: func(gname string, tasks []*Task) (*Task, error) {
return NewTask(gname, nil, MaxRetry(len(tasks))), nil // use max retry to see how many tasks were aggregated
},
tasks: []*Task{
NewTask("task1", nil, Group("mygroup")),

View File

@ -239,15 +239,15 @@ type GroupAggregator interface {
// Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
// The Queue option, if provided, will be ignored and the aggregated task will always be enqueued
// to the same queue the group belonged.
Aggregate(group string, tasks []*Task) *Task
Aggregate(group string, tasks []*Task) (*Task, error)
}
// The GroupAggregatorFunc type is an adapter to allow the use of ordinary functions as a GroupAggregator.
// If f is a function with the appropriate signature, GroupAggregatorFunc(f) is a GroupAggregator that calls f.
type GroupAggregatorFunc func(group string, tasks []*Task) *Task
type GroupAggregatorFunc func(group string, tasks []*Task) (*Task, error)
// Aggregate calls fn(group, tasks)
func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) *Task {
func (fn GroupAggregatorFunc) Aggregate(group string, tasks []*Task) (*Task, error) {
return fn(group, tasks)
}