From 74d2eea4e03435cb76d08710121d638a661920bf Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 13 Mar 2022 06:44:42 -0700 Subject: [PATCH] Clear group if aggregation set empties the group --- internal/rdb/rdb.go | 23 ++- internal/rdb/rdb_test.go | 336 ++++++++++++++++++++++++--------------- 2 files changed, 230 insertions(+), 129 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 16c2ff8..ec52405 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -993,16 +993,26 @@ func (r *RDB) ListGroups(qname string) ([]string, error) { return groups, nil } -// TODO: Add comment describing what the script does. +// aggregationCheckCmd checks the given group for whether to create an aggregation set. +// An aggregation set is created if one of the aggregation criteria is met: +// 1) group has reached or exceeded its max size +// 2) group's oldest task has reached or exceeded its max delay +// 3) group's latest task has reached or exceeded its grace period +// if aggreation criteria is met, the command moves those tasks from the group +// and put them in an aggregation set. Additionally, if the creation of aggregation set +// empties the group, it will clear the group name from the all groups set. +// // KEYS[1] -> asynq:{}:g: // KEYS[2] -> asynq:{}:g:: // KEYS[3] -> asynq:{}:aggregation_sets +// KEYS[4] -> asynq:{}:groups // ------- // ARGV[1] -> max group size // ARGV[2] -> max group delay in unix time // ARGV[3] -> start time of the grace period // ARGV[4] -> aggregation set expire time // ARGV[5] -> current time in unix time +// ARGV[6] -> group name // // Output: // Returns 0 if no aggregation set was created @@ -1020,6 +1030,9 @@ if maxSize ~= 0 and size >= maxSize then end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) + if size == maxSize then + redis.call("SREM", KEYS[4], ARGV[6]) + end return 1 end local maxDelay = tonumber(ARGV[2]) @@ -1035,6 +1048,9 @@ if maxDelay ~= 0 then end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) + if size <= maxSize then + redis.call("SREM", KEYS[4], ARGV[6]) + end return 1 end end @@ -1048,6 +1064,9 @@ if latestEntryScore <= gracePeriodStartTime then end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) + if size <= maxSize then + redis.call("SREM", KEYS[4], ARGV[6]) + end return 1 end return 0 @@ -1072,6 +1091,7 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma base.GroupKey(qname, gname), base.AggregationSetKey(qname, gname, aggregationSetID), base.AllAggregationSets(qname), + base.AllGroups(qname), } argv := []interface{}{ maxSize, @@ -1079,6 +1099,7 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma int64(gracePeriod.Seconds()), expireTime.Unix(), t.Unix(), + gname, } n, err := r.runScriptWithErrorCode(context.Background(), op, aggregationCheckCmd, keys, argv...) if err != nil { diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 1bfba53..cbddab7 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3112,6 +3112,7 @@ func TestAggregationCheck(t *testing.T) { now := time.Now() r.SetClock(timeutil.NewSimulatedClock(now)) + ctx := context.Background() msg1 := h.NewTaskMessageBuilder().SetType("task1").SetGroup("mygroup").Build() msg2 := h.NewTaskMessageBuilder().SetType("task2").SetGroup("mygroup").Build() msg3 := h.NewTaskMessageBuilder().SetType("task3").SetGroup("mygroup").Build() @@ -3119,23 +3120,33 @@ func TestAggregationCheck(t *testing.T) { msg5 := h.NewTaskMessageBuilder().SetType("task5").SetGroup("mygroup").Build() tests := []struct { - desc string - groups map[string]map[string][]base.Z - qname string - gname string - gracePeriod time.Duration - maxDelay time.Duration - maxSize int + desc string + // initial data + tasks []*taskData + groups map[string][]*redis.Z + allGroups map[string][]string + + // args + qname string + gname string + gracePeriod time.Duration + maxDelay time.Duration + maxSize int + + // expectaions shouldCreateSet bool // whether the check should create a new aggregation set wantAggregationSet []*base.TaskMessage - wantGroups map[string]map[string][]base.Z + wantGroups map[string][]redis.Z + shouldClearGroup bool // whehter the check should clear the group from redis }{ { - desc: "with an empty group", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, - }, + desc: "with an empty group", + tasks: []*taskData{}, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): {}, + }, + allGroups: map[string][]string{ + base.AllGroups("default"): {}, }, qname: "default", gname: "mygroup", @@ -3144,25 +3155,32 @@ func TestAggregationCheck(t *testing.T) { maxSize: 5, shouldCreateSet: false, wantAggregationSet: nil, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, - }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): {}, }, + shouldClearGroup: true, }, { desc: "with a group size reaching the max size", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-5 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 1 * time.Minute, @@ -3170,25 +3188,32 @@ func TestAggregationCheck(t *testing.T) { maxSize: 5, shouldCreateSet: true, wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5}, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, - }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): {}, }, + shouldClearGroup: true, }, { desc: "with group size greater than max size", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-5 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 2 * time.Minute, @@ -3196,26 +3221,31 @@ func TestAggregationCheck(t *testing.T) { maxSize: 3, shouldCreateSet: true, wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3}, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + shouldClearGroup: false, }, { desc: "with the most recent task older than grace period", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-5 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 1 * time.Minute, @@ -3223,25 +3253,32 @@ func TestAggregationCheck(t *testing.T) { maxSize: 5, shouldCreateSet: true, wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3}, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, - }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): {}, }, + shouldClearGroup: true, }, { desc: "with the oldest task older than max delay", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-15 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 2 * time.Minute, @@ -3249,25 +3286,32 @@ func TestAggregationCheck(t *testing.T) { maxSize: 30, shouldCreateSet: true, wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5}, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, - }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): {}, }, + shouldClearGroup: true, }, { desc: "with unlimited size", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-15 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 1 * time.Minute, @@ -3275,25 +3319,38 @@ func TestAggregationCheck(t *testing.T) { maxSize: 0, // maxSize=0 indicates no size limit shouldCreateSet: false, wantAggregationSet: nil, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + shouldClearGroup: false, }, { desc: "with unlimited delay", - groups: map[string]map[string][]base.Z{ - "default": { - "mygroup": { - {Message: msg1, Score: now.Add(-15 * time.Minute).Unix()}, - {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, - {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, - {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, - {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, - }, + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, qname: "default", gname: "mygroup", gracePeriod: 1 * time.Minute, @@ -3301,58 +3358,71 @@ func TestAggregationCheck(t *testing.T) { maxSize: 10, shouldCreateSet: false, wantAggregationSet: nil, - wantGroups: map[string]map[string][]base.Z{ - "default": { - "mygroup": {}, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-10 * time.Second).Unix())}, }, }, + shouldClearGroup: false, }, } for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllGroups(t, r.client, tc.groups) - aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, now, tc.gracePeriod, tc.maxDelay, tc.maxSize) - if err != nil { - t.Errorf("%s: AggregationCheck returned error: %v", tc.desc, err) - continue - } + t.Run(tc.desc, func(t *testing.T) { + SeedTasks(t, r.client, tc.tasks) + SeedZSets(t, r.client, tc.groups) + SeedSets(t, r.client, tc.allGroups) - if !tc.shouldCreateSet && aggregationSetID != "" { - t.Errorf("%s: AggregationCheck returned non empty set ID. want empty ID", tc.desc) - continue - } - if tc.shouldCreateSet && aggregationSetID == "" { - t.Errorf("%s: AggregationCheck returned empty set ID. want non empty ID", tc.desc) - continue - } + aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, now, tc.gracePeriod, tc.maxDelay, tc.maxSize) + if err != nil { + t.Fatalf("AggregationCheck returned error: %v", err) + } - if !tc.shouldCreateSet { - continue // below checks are intended for aggregation set - } + if !tc.shouldCreateSet && aggregationSetID != "" { + t.Fatal("AggregationCheck returned non empty set ID. want empty ID") + } + if tc.shouldCreateSet && aggregationSetID == "" { + t.Fatal("AggregationCheck returned empty set ID. want non empty ID") + } - msgs, deadline, err := r.ReadAggregationSet(tc.qname, tc.gname, aggregationSetID) - if err != nil { - t.Fatalf("%s: Failed to read aggregation set %q: %v", tc.desc, aggregationSetID, err) - } - if diff := cmp.Diff(tc.wantAggregationSet, msgs, h.SortMsgOpt); diff != "" { - t.Errorf("%s: Mismatch found in aggregation set: (-want,+got)\n%s", tc.desc, diff) - } + if tc.shouldCreateSet { + msgs, deadline, err := r.ReadAggregationSet(tc.qname, tc.gname, aggregationSetID) + if err != nil { + t.Fatalf("Failed to read aggregation set %q: %v", aggregationSetID, err) + } + if diff := cmp.Diff(tc.wantAggregationSet, msgs, h.SortMsgOpt); diff != "" { + t.Errorf("Mismatch found in aggregation set: (-want,+got)\n%s", diff) + } - if wantDeadline := now.Add(aggregationTimeout); deadline.Unix() != wantDeadline.Unix() { - t.Errorf("%s: ReadAggregationSet returned deadline=%v, want=%v", tc.desc, deadline, wantDeadline) - } - - for qname, groups := range tc.wantGroups { - for gname, want := range groups { - gotGroup := h.GetGroupEntries(t, r.client, qname, gname) - if diff := cmp.Diff(want, gotGroup, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s: Mismatch found in group zset: %q: (-want,+got)\n%s", - tc.desc, base.GroupKey(qname, gname), diff) + if wantDeadline := now.Add(aggregationTimeout); deadline.Unix() != wantDeadline.Unix() { + t.Errorf("ReadAggregationSet returned deadline=%v, want=%v", deadline, wantDeadline) } } - } + + AssertZSets(t, r.client, tc.wantGroups) + + if tc.shouldClearGroup { + if key := base.GroupKey(tc.qname, tc.gname); r.client.Exists(ctx, key).Val() != 0 { + t.Errorf("group key %q still exists", key) + } + if r.client.SIsMember(ctx, base.AllGroups(tc.qname), tc.gname).Val() { + t.Errorf("all-group set %q still contains the group name %q", base.AllGroups(tc.qname), tc.gname) + } + } else { + if key := base.GroupKey(tc.qname, tc.gname); r.client.Exists(ctx, key).Val() == 0 { + t.Errorf("group key %q does not exists", key) + } + if !r.client.SIsMember(ctx, base.AllGroups(tc.qname), tc.gname).Val() { + t.Errorf("all-group set %q doesn't contains the group name %q", base.AllGroups(tc.qname), tc.gname) + } + } + }) } } @@ -3704,6 +3774,16 @@ func SeedZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis } } +func SeedSets(tb testing.TB, r redis.UniversalClient, sets map[string][]string) { + for key, set := range sets { + for _, mem := range set { + if err := r.SAdd(context.Background(), key, mem).Err(); err != nil { + tb.Fatalf("Failed to seed set (key=%q): %v", key, err) + } + } + } +} + // TODO: move this helper somewhere more canonical func AssertZSets(t *testing.T, r redis.UniversalClient, wantZSets map[string][]redis.Z) { for key, want := range wantZSets {