From eb064c2bab2e2f226be72becc244ae8df1c8d311 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 15 Mar 2022 17:03:09 -0700 Subject: [PATCH] Fix AggregationCheck with unlimited size to clear group name from all-groups set --- internal/rdb/rdb.go | 4 ++-- internal/rdb/rdb_test.go | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ec52405..e986fb6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1048,7 +1048,7 @@ if maxDelay ~= 0 then end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) - if size <= maxSize then + if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 @@ -1064,7 +1064,7 @@ if latestEntryScore <= gracePeriodStartTime then end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) - if size <= maxSize then + if size <= maxSize or maxSize == 0 then redis.call("SREM", KEYS[4], ARGV[6]) end return 1 diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 6951ab6..528d917 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3137,7 +3137,7 @@ func TestAggregationCheck(t *testing.T) { shouldCreateSet bool // whether the check should create a new aggregation set wantAggregationSet []*base.TaskMessage wantGroups map[string][]redis.Z - shouldClearGroup bool // whehter the check should clear the group from redis + shouldClearGroup bool // whether the check should clear the group from redis }{ { desc: "with an empty group", @@ -3330,6 +3330,39 @@ func TestAggregationCheck(t *testing.T) { }, shouldClearGroup: false, }, + { + desc: "with unlimited size and passed grace period", + tasks: []*taskData{ + {msg: msg1, state: base.TaskStateAggregating}, + {msg: msg2, state: base.TaskStateAggregating}, + {msg: msg3, state: base.TaskStateAggregating}, + {msg: msg4, state: base.TaskStateAggregating}, + {msg: msg5, state: base.TaskStateAggregating}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "mygroup"): { + {Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())}, + {Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())}, + {Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())}, + {Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + {Member: msg5.ID, Score: float64(now.Add(-1 * time.Minute).Unix())}, + }, + }, + allGroups: map[string][]string{ + base.AllGroups("default"): {"mygroup"}, + }, + qname: "default", + gname: "mygroup", + gracePeriod: 30 * time.Second, + maxDelay: 30 * time.Minute, + maxSize: 0, // maxSize=0 indicates no size limit + shouldCreateSet: true, + wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5}, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "mygroup"): {}, + }, + shouldClearGroup: true, + }, { desc: "with unlimited delay", tasks: []*taskData{