mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Fix AggregationCheck with unlimited size to clear group name from
all-groups set
This commit is contained in:
parent
652939dd3a
commit
eb064c2bab
@ -1048,7 +1048,7 @@ if maxDelay ~= 0 then
|
|||||||
end
|
end
|
||||||
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
||||||
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
|
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
|
||||||
if size <= maxSize then
|
if size <= maxSize or maxSize == 0 then
|
||||||
redis.call("SREM", KEYS[4], ARGV[6])
|
redis.call("SREM", KEYS[4], ARGV[6])
|
||||||
end
|
end
|
||||||
return 1
|
return 1
|
||||||
@ -1064,7 +1064,7 @@ if latestEntryScore <= gracePeriodStartTime then
|
|||||||
end
|
end
|
||||||
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
||||||
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
|
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
|
||||||
if size <= maxSize then
|
if size <= maxSize or maxSize == 0 then
|
||||||
redis.call("SREM", KEYS[4], ARGV[6])
|
redis.call("SREM", KEYS[4], ARGV[6])
|
||||||
end
|
end
|
||||||
return 1
|
return 1
|
||||||
|
@ -3137,7 +3137,7 @@ func TestAggregationCheck(t *testing.T) {
|
|||||||
shouldCreateSet bool // whether the check should create a new aggregation set
|
shouldCreateSet bool // whether the check should create a new aggregation set
|
||||||
wantAggregationSet []*base.TaskMessage
|
wantAggregationSet []*base.TaskMessage
|
||||||
wantGroups map[string][]redis.Z
|
wantGroups map[string][]redis.Z
|
||||||
shouldClearGroup bool // whehter the check should clear the group from redis
|
shouldClearGroup bool // whether the check should clear the group from redis
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "with an empty group",
|
desc: "with an empty group",
|
||||||
@ -3330,6 +3330,39 @@ func TestAggregationCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
shouldClearGroup: false,
|
shouldClearGroup: false,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "with unlimited size and passed grace period",
|
||||||
|
tasks: []*taskData{
|
||||||
|
{msg: msg1, state: base.TaskStateAggregating},
|
||||||
|
{msg: msg2, state: base.TaskStateAggregating},
|
||||||
|
{msg: msg3, state: base.TaskStateAggregating},
|
||||||
|
{msg: msg4, state: base.TaskStateAggregating},
|
||||||
|
{msg: msg5, state: base.TaskStateAggregating},
|
||||||
|
},
|
||||||
|
groups: map[string][]*redis.Z{
|
||||||
|
base.GroupKey("default", "mygroup"): {
|
||||||
|
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||||
|
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||||
|
{Member: msg3.ID, Score: float64(now.Add(-2 * time.Minute).Unix())},
|
||||||
|
{Member: msg4.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
|
||||||
|
{Member: msg5.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
allGroups: map[string][]string{
|
||||||
|
base.AllGroups("default"): {"mygroup"},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
gname: "mygroup",
|
||||||
|
gracePeriod: 30 * time.Second,
|
||||||
|
maxDelay: 30 * time.Minute,
|
||||||
|
maxSize: 0, // maxSize=0 indicates no size limit
|
||||||
|
shouldCreateSet: true,
|
||||||
|
wantAggregationSet: []*base.TaskMessage{msg1, msg2, msg3, msg4, msg5},
|
||||||
|
wantGroups: map[string][]redis.Z{
|
||||||
|
base.GroupKey("default", "mygroup"): {},
|
||||||
|
},
|
||||||
|
shouldClearGroup: true,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "with unlimited delay",
|
desc: "with unlimited delay",
|
||||||
tasks: []*taskData{
|
tasks: []*taskData{
|
||||||
|
Loading…
Reference in New Issue
Block a user