2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Store aggregation set *key* in all aggreationsets zset

This commit is contained in:
Ken Hibino 2022-03-11 11:10:44 -08:00
parent 3551d3334c
commit 47a66231b3

View File

@ -1001,9 +1001,8 @@ func (r *RDB) ListGroups(qname string) ([]string, error) {
// ARGV[1] -> max group size
// ARGV[2] -> max group delay in unix time
// ARGV[3] -> start time of the grace period
// ARGV[4] -> aggregation set ID
// ARGV[5] -> aggregation set expire time
// ARGV[6] -> current time in unix time
// ARGV[4] -> aggregation set expire time
// ARGV[5] -> current time in unix time
//
// Output:
// Returns 0 if no aggregation set was created
@ -1020,11 +1019,11 @@ if maxSize ~= 0 and size >= maxSize then
redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i])
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
return 1
end
local maxDelay = tonumber(ARGV[2])
local currentTime = tonumber(ARGV[6])
local currentTime = tonumber(ARGV[5])
if maxDelay ~= 0 then
local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")
local oldestEntryScore = tonumber(oldestEntry[2])
@ -1035,7 +1034,7 @@ if maxDelay ~= 0 then
redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i])
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
return 1
end
end
@ -1048,7 +1047,7 @@ if latestEntryScore <= gracePeriodStartTime then
redis.call("ZADD", KEYS[2], tonumber(res[i+1]), res[i])
end
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
redis.call("ZADD", KEYS[3], ARGV[4], KEYS[2])
return 1
end
return 0
@ -1078,7 +1077,6 @@ func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, ma
maxSize,
int64(maxDelay.Seconds()),
int64(gracePeriod.Seconds()),
aggregationSetID,
expireTime.Unix(),
t.Unix(),
}
@ -1114,10 +1112,11 @@ return msgs
func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error) {
var op errors.Op = "RDB.ReadAggregationSet"
ctx := context.Background()
aggSetKey := base.AggregationSetKey(qname, gname, setID)
res, err := readAggregationSetCmd.Run(ctx, r.client,
[]string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)).Result()
[]string{aggSetKey}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, time.Time{}, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "smembers", Err: err})
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
data, err := cast.ToStringSliceE(res)
if err != nil {
@ -1131,7 +1130,7 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag
}
msgs = append(msgs, msg)
}
deadlineUnix, err := r.client.ZScore(ctx, base.AllAggregationSets(qname), setID).Result()
deadlineUnix, err := r.client.ZScore(ctx, base.AllAggregationSets(qname), aggSetKey).Result()
if err != nil {
return nil, time.Time{}, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zscore", Err: err})
}
@ -1156,9 +1155,14 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri
return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname))
}
var reclaimStateAggregationSetsCmd = redis.NewScript(`
`)
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and
// reclaim tasks in the stale aggregation set by putting them back in the group.
func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
//now := r.clock.Now()
return nil
}