diff --git a/aggregator.go b/aggregator.go index 40a07db..d4a5680 100644 --- a/aggregator.go +++ b/aggregator.go @@ -137,7 +137,7 @@ func (a *aggregator) aggregate(t time.Time) { } for _, gname := range groups { aggregationSetID, err := a.broker.AggregationCheck( - qname, gname, t.Add(-a.gracePeriod), t.Add(-a.maxDelay), a.maxSize) + qname, gname, t, a.gracePeriod, a.maxDelay, a.maxSize) if err != nil { a.logger.Errorf("Failed to run aggregation check: queue=%q group=%q", qname, gname) continue diff --git a/internal/base/base.go b/internal/base/base.go index fca070d..210ff37 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -731,7 +731,7 @@ type Broker interface { AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error ListGroups(qname string) ([]string, error) - AggregationCheck(qname, gname string, gracePeriodStartTime, maxDelayTime time.Time, maxSize int) (aggregationSetID string, err error) + AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error) ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error) DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 25864db..2b7efa4 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1003,6 +1003,7 @@ func (r *RDB) ListGroups(qname string) ([]string, error) { // ARGV[3] -> start time of the grace period // ARGV[4] -> aggregation set ID // ARGV[5] -> aggregation set expire time +// ARGV[6] -> current time in unix time // // Output: // Returns 0 if no aggregation set was created @@ -1013,7 +1014,7 @@ if size == 0 then return 0 end local maxSize = tonumber(ARGV[1]) -if size >= maxSize then +if maxSize ~= 0 and size >= maxSize then local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1) for _, msg in ipairs(msgs) do redis.call("SADD", KEYS[2], msg) @@ -1022,21 +1023,25 @@ if size >= maxSize then redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) return 1 end -local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") -local oldestEntryScore = tonumber(oldestEntry[2]) -local maxDelayTime = tonumber(ARGV[2]) -if oldestEntryScore <= maxDelayTime then - local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1) - for _, msg in ipairs(msgs) do - redis.call("SADD", KEYS[2], msg) +local maxDelay = tonumber(ARGV[2]) +local currentTime = tonumber(ARGV[6]) +if maxDelay ~= 0 then + local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") + local oldestEntryScore = tonumber(oldestEntry[2]) + local maxDelayTime = currentTime - maxDelay + if oldestEntryScore <= maxDelayTime then + local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1) + for _, msg in ipairs(msgs) do + redis.call("SADD", KEYS[2], msg) + end + redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) + redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) + return 1 end - redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) - redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) - return 1 end local latestEntry = redis.call("ZREVRANGE", KEYS[1], 0, 0, "WITHSCORES") local latestEntryScore = tonumber(latestEntry[2]) -local gracePeriodStartTime = tonumber(ARGV[3]) +local gracePeriodStartTime = currentTime - tonumber(ARGV[3]) if latestEntryScore <= gracePeriodStartTime then local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1) for _, msg in ipairs(msgs) do @@ -1055,11 +1060,12 @@ const aggregationTimeout = 2 * time.Minute // AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the // group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns -// set ID. If not, it returns an empty string for the set ID. +// the set ID. If not, it returns an empty string for the set ID. +// The time for gracePeriod and maxDelay is computed relative to the time t. // // Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words, // the function only checks the most recently added task aganist the given gracePeriod. -func (r *RDB) AggregationCheck(qname, gname string, gracePeriodStartTime, maxDelayTime time.Time, maxSize int) (string, error) { +func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error) { var op errors.Op = "RDB.AggregationCheck" aggregationSetID := uuid.NewString() expireTime := r.clock.Now().Add(aggregationTimeout) @@ -1070,10 +1076,11 @@ func (r *RDB) AggregationCheck(qname, gname string, gracePeriodStartTime, maxDel } argv := []interface{}{ maxSize, - maxDelayTime.Unix(), - gracePeriodStartTime.Unix(), + int64(maxDelay.Seconds()), + int64(gracePeriod.Seconds()), aggregationSetID, expireTime.Unix(), + t.Unix(), } n, err := r.runScriptWithErrorCode(context.Background(), op, aggregationCheckCmd, keys, argv...) if err != nil { diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 8df2b84..0d61c45 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3254,15 +3254,65 @@ func TestAggregationCheck(t *testing.T) { }, }, }, + { + desc: "with unlimited size", + groups: map[string]map[string][]base.Z{ + "default": { + "mygroup": { + {Message: msg1, Score: now.Add(-15 * time.Minute).Unix()}, + {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, + {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, + {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, + {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, + }, + }, + }, + qname: "default", + gname: "mygroup", + gracePeriod: 1 * time.Minute, + maxDelay: 30 * time.Minute, + maxSize: 0, // maxSize=0 indicates no size limit + shouldCreateSet: false, + wantAggregationSet: nil, + wantGroups: map[string]map[string][]base.Z{ + "default": { + "mygroup": {}, + }, + }, + }, + { + desc: "with unlimited delay", + groups: map[string]map[string][]base.Z{ + "default": { + "mygroup": { + {Message: msg1, Score: now.Add(-15 * time.Minute).Unix()}, + {Message: msg2, Score: now.Add(-3 * time.Minute).Unix()}, + {Message: msg3, Score: now.Add(-2 * time.Minute).Unix()}, + {Message: msg4, Score: now.Add(-1 * time.Minute).Unix()}, + {Message: msg5, Score: now.Add(-10 * time.Second).Unix()}, + }, + }, + }, + qname: "default", + gname: "mygroup", + gracePeriod: 1 * time.Minute, + maxDelay: 0, // maxDelay=0 indicates no limit + maxSize: 10, + shouldCreateSet: false, + wantAggregationSet: nil, + wantGroups: map[string]map[string][]base.Z{ + "default": { + "mygroup": {}, + }, + }, + }, } for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllGroups(t, r.client, tc.groups) - gracePeriodStartTime := now.Add(-tc.gracePeriod) - maxDelayTime := now.Add(-tc.maxDelay) - aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, gracePeriodStartTime, maxDelayTime, tc.maxSize) + aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, now, tc.gracePeriod, tc.maxDelay, tc.maxSize) if err != nil { t.Errorf("%s: AggregationCheck returned error: %v", tc.desc, err) continue diff --git a/server.go b/server.go index 3b700a1..ad6d7e3 100644 --- a/server.go +++ b/server.go @@ -197,7 +197,7 @@ type Config struct { // GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating // the tasks in a group. If an incoming task is received within this period, the server will wait for another - // period of the same length, up to GroupMaxDelay. + // period of the same length, up to GroupMaxDelay if specified. // // If unset or zero, the grace period is set to 1 minute. // Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to @@ -207,13 +207,13 @@ type Config struct { // GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating // the tasks in a group. // - // If unset or zero, the max delay is set to 10 minutes. + // If unset or zero, no delay limit is used. GroupMaxDelay time.Duration // GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group. // If GroupMaxSize is reached, the server will aggregate the tasks into one immediately. // - // If unset or zero, the max size is set to 100. + // If unset or zero, no size limit is used. GroupMaxSize int // GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task. @@ -367,10 +367,6 @@ const ( defaultDelayedTaskCheckInterval = 5 * time.Second defaultGroupGracePeriod = 1 * time.Minute - - defaultGroupMaxDelay = 10 * time.Minute - - defaultGroupMaxSize = 100 ) // NewServer returns a new Server given a redis connection option @@ -428,14 +424,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { if groupGracePeriod < time.Second { panic("GroupGracePeriod cannot be less than a second") } - groupMaxDelay := cfg.GroupMaxDelay - if groupMaxDelay == 0 { - groupMaxDelay = defaultGroupMaxDelay - } - groupMaxSize := cfg.GroupMaxSize - if groupMaxSize == 0 { - groupMaxSize = defaultGroupMaxSize - } logger := log.NewLogger(cfg.Logger) loglevel := cfg.LogLevel if loglevel == level_unspecified { @@ -522,8 +510,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { broker: rdb, queues: qnames, gracePeriod: groupGracePeriod, - maxDelay: groupMaxDelay, - maxSize: groupMaxSize, + maxDelay: cfg.GroupMaxDelay, + maxSize: cfg.GroupMaxSize, aggregateFunc: cfg.GroupAggregateFunc, }) return &Server{