2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 09:56:12 +08:00

Update RDB.ForwardIfReady to forward to group if groupKey is specified

This commit is contained in:
Ken Hibino
2022-03-06 05:45:33 -08:00
parent a88f6498af
commit 17e9725287
3 changed files with 195 additions and 19 deletions

View File

@@ -515,7 +515,8 @@ if redis.call("EXISTS", KEYS[1]) == 1 then
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "aggregating")
"state", "aggregating",
"group", ARGV[4])
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
redis.call("SADD", KEYS[3], ARGV[4])
return 1
@@ -576,7 +577,8 @@ if redis.call("EXISTS", KEYS[1]) == 1 then
end
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "aggregating")
"state", "aggregating",
"group", ARGV[4])
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
redis.call("SADD", KEYS[3], ARGV[4])
return 1
@@ -916,24 +918,41 @@ func (r *RDB) ForwardIfReady(qnames ...string) error {
// ARGV[1] -> current unix time in seconds
// ARGV[2] -> task key prefix
// ARGV[3] -> current unix time in nsec
// ARGV[4] -> group key prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
redis.call("HSET", ARGV[2] .. id,
"state", "pending",
"pending_since", ARGV[3])
local taskKey = ARGV[2] .. id
local group = redis.call("HGET", taskKey, "group")
if group then
redis.call("ZADD", ARGV[4] .. group, ARGV[1], id)
redis.call("ZREM", KEYS[1], id)
redis.call("HSET", taskKey,
"state", "aggregating")
else
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
redis.call("HSET", taskKey,
"state", "pending",
"pending_since", ARGV[3])
end
end
return table.getn(ids)`)
// forward moves tasks with a score less than the current unix time
// from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
// forward moves tasks with a score less than the current unix time from the delayed (i.e. scheduled | retry) zset
// to the pending list or group set.
// It returns the number of tasks moved.
func (r *RDB) forward(delayedKey, pendingKey, taskKeyPrefix, groupKeyPrefix string) (int, error) {
now := r.clock.Now()
res, err := forwardCmd.Run(context.Background(), r.client,
[]string{src, dst}, now.Unix(), taskKeyPrefix, now.UnixNano()).Result()
keys := []string{delayedKey, pendingKey}
argv := []interface{}{
now.Unix(),
taskKeyPrefix,
now.UnixNano(),
groupKeyPrefix,
}
res, err := forwardCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
}
@@ -945,15 +964,16 @@ func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
}
// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates
// their state to "pending".
// their state to "pending" or "aggregating".
func (r *RDB) forwardAll(qname string) (err error) {
sources := []string{base.ScheduledKey(qname), base.RetryKey(qname)}
dst := base.PendingKey(qname)
delayedKeys := []string{base.ScheduledKey(qname), base.RetryKey(qname)}
pendingKey := base.PendingKey(qname)
taskKeyPrefix := base.TaskKeyPrefix(qname)
for _, src := range sources {
groupKeyPrefix := base.GroupKeyPrefix(qname)
for _, delayedKey := range delayedKeys {
n := 1
for n != 0 {
n, err = r.forward(src, dst, taskKeyPrefix)
n, err = r.forward(delayedKey, pendingKey, taskKeyPrefix, groupKeyPrefix)
if err != nil {
return err
}