diff --git a/asynq.go b/asynq.go index 3d27d06..376f558 100644 --- a/asynq.go +++ b/asynq.go @@ -98,6 +98,14 @@ type TaskInfo struct { // Deadline is the deadline for the task, zero value if not specified. Deadline time.Time + // Group is the name of the group in which the task belongs. + // + // Tasks in the same queue can be grouped together by Group name and will be aggregated into one task + // by a Server processing the queue. + // + // Empty string (default) indicates task does not belong to any groups, and no aggregation will be applied to the task. + Group string + // NextProcessAt is the time the task is scheduled to be processed, // zero if not applicable. NextProcessAt time.Time @@ -140,6 +148,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time MaxRetry: msg.Retry, Retried: msg.Retried, LastErr: msg.ErrorMsg, + Group: msg.GroupKey, Timeout: time.Duration(msg.Timeout) * time.Second, Deadline: fromUnixTimeOrZero(msg.Deadline), Retention: time.Duration(msg.Retention) * time.Second, diff --git a/client_test.go b/client_test.go index 2d4330e..d578d52 100644 --- a/client_test.go +++ b/client_test.go @@ -503,6 +503,7 @@ func TestClientEnqueueWithGroupOption(t *testing.T) { }, wantInfo: &TaskInfo{ Queue: "default", + Group: "mygroup", Type: task.Type(), Payload: task.Payload(), State: TaskStateAggregating, @@ -548,6 +549,7 @@ func TestClientEnqueueWithGroupOption(t *testing.T) { }, wantInfo: &TaskInfo{ Queue: "default", + Group: "mygroup", Type: task.Type(), Payload: task.Payload(), State: TaskStateScheduled,