2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 15:52:18 +08:00

Clear group if aggregation set empties the group

This commit is contained in:
Ken Hibino 2022-03-13 06:44:42 -07:00
parent 60a4dc1401
commit 74d2eea4e0
2 changed files with 230 additions and 129 deletions

View File

@ -993,16 +993,26 @@ func (r *RDB) ListGroups(qname string) ([]string, error) {
return groups, nil
}
// TODO: Add comment describing what the script does.
// aggregationCheckCmd checks the given group for whether to create an aggregation set.
// An aggregation set is created if one of the aggregation criteria is met:
// 1) group has reached or exceeded its max size
// 2) group's oldest task has reached or exceeded its max delay
// 3) group's latest task has reached or exceeded its grace period
// if aggreation criteria is met, the command moves those tasks from the group
// and put them in an aggregation set. Additionally, if the creation of aggregation set
// empties the group, it will clear the group name from the all groups set.
//
// KEYS[1] -> asynq:{<qname>}:g:<gname>
// KEYS[2] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>
// KEYS[3] -> asynq:{<qname>}:aggregation_sets
// KEYS[4] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> max group size
// ARGV[2] -> max group delay in unix time
// ARGV[3] -> start time of the grace period
// ARGV[4] -> aggregation set expire time
// ARGV[5] -> current time in unix time
// ARGV[6] -> group name
//
// Output:
// Returns 0 if no aggregation set was created
@ -1020,6 +1030,9 @@ if maxSize ~= 0 and size >= maxSize then
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
if size == maxSize then
redis.call("SREM", KEYS[4], ARGV[6])
end
return 1
end
local maxDelay = tonumber(ARGV[2])
@ -1035,6 +1048,9 @@ if maxDelay ~= 0 then
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
if size <= maxSize then
redis.call("SREM", KEYS[4], ARGV[6])
end
return 1
end
end
@ -1048,6 +1064,9 @@ if latestEntryScore <= gracePeriodStartTime then
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
if size <= maxSize then
redis.call("SREM", KEYS[4], ARGV[6])
end
return 1
end
return 0
@ -1072,6 +1091,7 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma
base.GroupKey(qname, gname),
base.AggregationSetKey(qname, gname, aggregationSetID),
base.AllAggregationSets(qname),
base.AllGroups(qname),
}
argv := []interface{}{
maxSize,
@ -1079,6 +1099,7 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma
int64(gracePeriod.Seconds()),
expireTime.Unix(),
t.Unix(),
gname,
}
n, err := r.runScriptWithErrorCode(context.Background(), op, aggregationCheckCmd, keys, argv...)
if err != nil {

View File

@ -3112,6 +3112,7 @@ func TestAggregationCheck(t *testing.T) {
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))
ctx := context.Background()
msg1 := h.NewTaskMessageBuilder().SetType("task1").SetGroup("mygroup").Build()
msg2 := h.NewTaskMessageBuilder().SetType("task2").SetGroup("mygroup").Build()
msg3 := h.NewTaskMessageBuilder().SetType("task3").SetGroup("mygroup").Build()
@ -3120,22 +3121,32 @@ func TestAggregationCheck(t *testing.T) {
tests := []struct {
desc string
groups map[string]map[string][]base.Z
// initial data
tasks []*taskData
groups map[string][]*redis.Z
allGroups map[string][]string
// args
qname string
gname string
gracePeriod time.Duration
maxDelay time.Duration
maxSize int
// expectaions
shouldCreateSet bool // whether the check should create a new aggregation set
wantAggregationSet []*base.TaskMessage
wantGroups map[string]map[string][]base.Z
wantGroups map[string][]redis.Z
shouldClearGroup bool // whehter the check should clear the group from redis
}{
{
desc: "with an empty group",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
tasks: []*taskData{},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {},
},
allGroups: map[string][]string{
base.AllGroups("default"): {},
},
qname: "default",
gname: "mygroup",
@ -3144,24 +3155,31 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 5,
shouldCreateSet: false,
wantAggregationSet: nil,
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {},
},
shouldClearGroup: true,
},
{
desc: "with a group size reaching the max size",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-5 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
{msg: msg4, state: base.TaskStateAggregating},
{msg: msg5, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3170,24 +3188,31 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 5,
shouldCreateSet: true,
wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5},
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {},
},
shouldClearGroup: true,
},
{
desc: "with group size greater than max size",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-5 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
{msg: msg4, state: base.TaskStateAggregating},
{msg: msg5, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3196,25 +3221,30 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 3,
shouldCreateSet: true,
wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3},
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
shouldClearGroup: false,
},
{
desc: "with the most recent task older than grace period",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-5 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3223,24 +3253,31 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 5,
shouldCreateSet: true,
wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3},
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {},
},
shouldClearGroup: true,
},
{
desc: "with the oldest task older than max delay",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-15 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
{msg: msg4, state: base.TaskStateAggregating},
{msg: msg5, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3249,24 +3286,31 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 30,
shouldCreateSet: true,
wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5},
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {},
},
shouldClearGroup: true,
},
{
desc: "with unlimited size",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-15 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
{msg: msg4, state: base.TaskStateAggregating},
{msg: msg5, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3275,24 +3319,37 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 0, // maxSize=0 indicates no size limit
shouldCreateSet: false,
wantAggregationSet: nil,
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
shouldClearGroup: false,
},
{
desc: "with unlimited delay",
groups: map[string]map[string][]base.Z{
"default": {
"mygroup": {
{Message: msg1, Score: now.Add(-15 * time.Minute).Unix()},
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
tasks: []*taskData{
{msg: msg1, state: base.TaskStateAggregating},
{msg: msg2, state: base.TaskStateAggregating},
{msg: msg3, state: base.TaskStateAggregating},
{msg: msg4, state: base.TaskStateAggregating},
{msg: msg5, state: base.TaskStateAggregating},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"mygroup"},
},
qname: "default",
gname: "mygroup",
@ -3301,58 +3358,71 @@ func TestAggregationCheck(t *testing.T) {
maxSize: 10,
shouldCreateSet: false,
wantAggregationSet: nil,
wantGroups: map[string]map[string][]base.Z{
"default": {
"mygroup": {},
wantGroups: map[string][]redis.Z{
base.GroupKey("default", "mygroup"): {
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
{Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
},
},
shouldClearGroup: false,
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllGroups(t, r.client, tc.groups)
t.Run(tc.desc, func(t *testing.T) {
SeedTasks(t, r.client, tc.tasks)
SeedZSets(t, r.client, tc.groups)
SeedSets(t, r.client, tc.allGroups)
aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, now, tc.gracePeriod, tc.maxDelay, tc.maxSize)
if err != nil {
t.Errorf("%s: AggregationCheck returned error: %v", tc.desc, err)
continue
t.Fatalf("AggregationCheck returned error: %v", err)
}
if !tc.shouldCreateSet && aggregationSetID != "" {
t.Errorf("%s: AggregationCheck returned non empty set ID. want empty ID", tc.desc)
continue
t.Fatal("AggregationCheck returned non empty set ID. want empty ID")
}
if tc.shouldCreateSet && aggregationSetID == "" {
t.Errorf("%s: AggregationCheck returned empty set ID. want non empty ID", tc.desc)
continue
}
if !tc.shouldCreateSet {
continue // below checks are intended for aggregation set
t.Fatal("AggregationCheck returned empty set ID. want non empty ID")
}
if tc.shouldCreateSet {
msgs, deadline, err := r.ReadAggregationSet(tc.qname, tc.gname, aggregationSetID)
if err != nil {
t.Fatalf("%s: Failed to read aggregation set %q: %v", tc.desc, aggregationSetID, err)
t.Fatalf("Failed to read aggregation set %q: %v", aggregationSetID, err)
}
if diff := cmp.Diff(tc.wantAggregationSet, msgs, h.SortMsgOpt); diff != "" {
t.Errorf("%s: Mismatch found in aggregation set: (-want,+got)\n%s", tc.desc, diff)
t.Errorf("Mismatch found in aggregation set: (-want,+got)\n%s", diff)
}
if wantDeadline := now.Add(aggregationTimeout); deadline.Unix() != wantDeadline.Unix() {
t.Errorf("%s: ReadAggregationSet returned deadline=%v, want=%v", tc.desc, deadline, wantDeadline)
t.Errorf("ReadAggregationSet returned deadline=%v, want=%v", deadline, wantDeadline)
}
}
for qname, groups := range tc.wantGroups {
for gname, want := range groups {
gotGroup := h.GetGroupEntries(t, r.client, qname, gname)
if diff := cmp.Diff(want, gotGroup, h.SortZSetEntryOpt); diff != "" {
t.Errorf("%s: Mismatch found in group zset: %q: (-want,+got)\n%s",
tc.desc, base.GroupKey(qname, gname), diff)
}
AssertZSets(t, r.client, tc.wantGroups)
if tc.shouldClearGroup {
if key := base.GroupKey(tc.qname, tc.gname); r.client.Exists(ctx, key).Val() != 0 {
t.Errorf("group key %q still exists", key)
}
if r.client.SIsMember(ctx, base.AllGroups(tc.qname), tc.gname).Val() {
t.Errorf("all-group set %q still contains the group name %q", base.AllGroups(tc.qname), tc.gname)
}
} else {
if key := base.GroupKey(tc.qname, tc.gname); r.client.Exists(ctx, key).Val() == 0 {
t.Errorf("group key %q does not exists", key)
}
if !r.client.SIsMember(ctx, base.AllGroups(tc.qname), tc.gname).Val() {
t.Errorf("all-group set %q doesn't contains the group name %q", base.AllGroups(tc.qname), tc.gname)
}
}
})
}
}
@ -3704,6 +3774,16 @@ func SeedZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis
}
}
func SeedSets(tb testing.TB, r redis.UniversalClient, sets map[string][]string) {
for key, set := range sets {
for _, mem := range set {
if err := r.SAdd(context.Background(), key, mem).Err(); err != nil {
tb.Fatalf("Failed to seed set (key=%q): %v", key, err)
}
}
}
}
// TODO: move this helper somewhere more canonical
func AssertZSets(t *testing.T, r redis.UniversalClient, wantZSets map[string][]redis.Z) {
for key, want := range wantZSets {