diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c9b5283..f3875d5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1001,9 +1001,8 @@ func (r *RDB) ListGroups(qname string) ([]string, error) { // ARGV[1] -> max group size // ARGV[2] -> max group delay in unix time // ARGV[3] -> start time of the grace period -// ARGV[4] -> aggregation set ID -// ARGV[5] -> aggregation set expire time -// ARGV[6] -> current time in unix time +// ARGV[4] -> aggregation set expire time +// ARGV[5] -> current time in unix time // // Output: // Returns 0 if no aggregation set was created @@ -1020,11 +1019,11 @@ if maxSize ~= 0 and size >= maxSize then redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) - redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) + redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) return 1 end local maxDelay = tonumber(ARGV[2]) -local currentTime = tonumber(ARGV[6]) +local currentTime = tonumber(ARGV[5]) if maxDelay ~= 0 then local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES") local oldestEntryScore = tonumber(oldestEntry[2]) @@ -1035,7 +1034,7 @@ if maxDelay ~= 0 then redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) - redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) + redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) return 1 end end @@ -1048,7 +1047,7 @@ if latestEntryScore <= gracePeriodStartTime then redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i]) end redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1) - redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4]) + redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2]) return 1 end return 0 @@ -1078,7 +1077,6 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma maxSize, int64(maxDelay.Seconds()), int64(gracePeriod.Seconds()), - aggregationSetID, expireTime.Unix(), t.Unix(), } @@ -1114,10 +1112,11 @@ return msgs func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error) { var op errors.Op = "RDB.ReadAggregationSet" ctx := context.Background() + aggSetKey := base.AggregationSetKey(qname, gname, setID) res, err := readAggregationSetCmd.Run(ctx, r.client, - []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)).Result() + []string{aggSetKey}, base.TaskKeyPrefix(qname)).Result() if err != nil { - return nil, time.Time{}, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: err}) + return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } data, err := cast.ToStringSliceE(res) if err != nil { @@ -1131,7 +1130,7 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag } msgs = append(msgs, msg) } - deadlineUnix, err := r.client.ZScore(ctx, base.AllAggregationSets(qname), setID).Result() + deadlineUnix, err := r.client.ZScore(ctx, base.AllAggregationSets(qname), aggSetKey).Result() if err != nil { return nil, time.Time{}, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zscore", Err: err}) } @@ -1156,9 +1155,14 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)) } +var reclaimStateAggregationSetsCmd = redis.NewScript(` + +`) + // ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // reclaim tasks in the stale aggregation set by putting them back in the group. func (r *RDB) ReclaimStaleAggregationSets(qname string) error { + //now := r.clock.Now() return nil }