mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-19 23:30:20 +08:00
🔥 update rdb, support proxy redis cluster
This commit is contained in:
parent
6b98c0bbae
commit
476b69d43f
@ -9,6 +9,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@ -216,19 +219,45 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
||||
//
|
||||
// Note: dequeueCmd checks whether a queue is paused first, before
|
||||
// calling RPOPLPUSH to pop a task from the queue.
|
||||
var dequeueCmd = redis.NewScript(`
|
||||
// var dequeueCmd = redis.NewScript(`
|
||||
// if redis.call("EXISTS", KEYS[2]) == 0 then
|
||||
// local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
||||
// if id then
|
||||
// local key = ARGV[2] .. id
|
||||
// redis.call("HSET", key, "state", "active")
|
||||
// redis.call("HDEL", key, "pending_since")
|
||||
// redis.call("ZADD", KEYS[4], ARGV[1], id)
|
||||
// return redis.call("HGET", key, "msg")
|
||||
// end
|
||||
// end
|
||||
// return nil`)
|
||||
|
||||
var getDequeueTaskIDCmd = redis.NewScript(`
|
||||
if redis.call("EXISTS", KEYS[2]) == 0 then
|
||||
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
||||
if id then
|
||||
local key = ARGV[2] .. id
|
||||
redis.call("HSET", key, "state", "active")
|
||||
redis.call("HDEL", key, "pending_since")
|
||||
redis.call("ZADD", KEYS[4], ARGV[1], id)
|
||||
return redis.call("HGET", key, "msg")
|
||||
end
|
||||
return id
|
||||
end
|
||||
return nil`)
|
||||
|
||||
func getUpdateDequeueCmd(taskID string) *redis.Script {
|
||||
|
||||
cmd := ""
|
||||
|
||||
hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[1], "state", "active") `)
|
||||
cmd += hSetCmd
|
||||
|
||||
hDelCmd := fmt.Sprintf(`redis.call("HDEL", KEYS[1], "pending_since") `)
|
||||
cmd += hDelCmd
|
||||
|
||||
zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[5], ARGV[1], "%s") `, taskID)
|
||||
cmd += zAddCmd
|
||||
|
||||
cmd += fmt.Sprintf(`return redis.call("HGET", KEYS[1], "msg")`)
|
||||
|
||||
return redis.NewScript(cmd)
|
||||
|
||||
}
|
||||
|
||||
// Dequeue queries given queues in order and pops a task message
|
||||
// off a queue if one exists and returns the message and its lease expiration time.
|
||||
// Dequeue skips a queue if the queue is paused.
|
||||
@ -247,12 +276,39 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT
|
||||
leaseExpirationTime.Unix(),
|
||||
base.TaskKeyPrefix(qname),
|
||||
}
|
||||
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
tRes, err := getDequeueTaskIDCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
if err == redis.Nil {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
// if id exists
|
||||
taskID, ok := tRes.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
argv2 := base.TaskKeyPrefix(qname)
|
||||
updateCmd := getUpdateDequeueCmd(taskID)
|
||||
|
||||
nKeys := []string{}
|
||||
|
||||
nKeys = append(nKeys, argv2+taskID)
|
||||
for _, item := range keys {
|
||||
nKeys = append(nKeys, item)
|
||||
}
|
||||
|
||||
res, err := updateCmd.Run(context.Background(), r.client, nKeys, argv...).Result()
|
||||
|
||||
if err == redis.Nil {
|
||||
continue
|
||||
} else if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
encoded, err := cast.ToStringE(res)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
||||
@ -941,6 +997,141 @@ for _, id in ipairs(ids) do
|
||||
end
|
||||
return table.getn(ids)`)
|
||||
|
||||
// forward (1) get taskIDs
|
||||
var getForwardTasksCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)`)
|
||||
|
||||
// forward (2) get group info
|
||||
func getForwardTaskGroupsCmd(taskIDs []string) *redis.Script {
|
||||
cmd := ""
|
||||
gTableCmd := fmt.Sprintf("local gTable = {} ")
|
||||
|
||||
cmd += gTableCmd
|
||||
|
||||
for i := range taskIDs {
|
||||
groupCmd := fmt.Sprintf(
|
||||
`local group = redis.call("HGET", KEYS[%d], "group") if group == nil then group = '' end gTable[%d] = group `,
|
||||
i+1, i+1)
|
||||
cmd += groupCmd
|
||||
}
|
||||
|
||||
cmd += fmt.Sprintf("return gTable ")
|
||||
return redis.NewScript(cmd)
|
||||
}
|
||||
|
||||
// forward (4) scheduled
|
||||
func getScheduledForwardTaskCmd(taskIDs []string, argv1 int64) *redis.Script {
|
||||
|
||||
cmd := ""
|
||||
|
||||
for i, taskID := range taskIDs {
|
||||
zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[%d], %d, "%s") `, (i+1)*2, argv1, taskID)
|
||||
cmd += zAddCmd
|
||||
|
||||
zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[1], "%s") `, taskID)
|
||||
cmd += zRemCmd
|
||||
|
||||
hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[%d], "state", "aggregating") `, (i+1)*2+1)
|
||||
cmd += hSetCmd
|
||||
}
|
||||
|
||||
cmd += "return '1'"
|
||||
return redis.NewScript(cmd)
|
||||
|
||||
}
|
||||
|
||||
// forward (3) retry
|
||||
func getRetryForwardTaskCmd(taskIDs []string, argv3 int64) *redis.Script {
|
||||
|
||||
cmd := ""
|
||||
|
||||
for i, taskID := range taskIDs {
|
||||
lPushCmd := fmt.Sprintf(`redis.call("LPUSH", KEYS[1], "%s") `, taskID)
|
||||
cmd += lPushCmd
|
||||
|
||||
zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[2], "%s") `, taskID)
|
||||
cmd += zRemCmd
|
||||
|
||||
hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[%d], "state", "pending", "pending_since", "%s") `, i+3, fmt.Sprintf("%d", argv3))
|
||||
cmd += hSetCmd
|
||||
}
|
||||
|
||||
cmd += "return '1'"
|
||||
return redis.NewScript(cmd)
|
||||
|
||||
}
|
||||
|
||||
func (r *RDB) doForward(IDs []string, keys []string, argv []interface{}) (int, error) {
|
||||
|
||||
argv1, _ := argv[0].(int64)
|
||||
argv2, _ := argv[1].(string)
|
||||
argv3, _ := argv[2].(int64)
|
||||
argv4, _ := argv[3].(string)
|
||||
groupCmd := getForwardTaskGroupsCmd(IDs)
|
||||
nKeys := []string{}
|
||||
for _, item := range IDs {
|
||||
nKeys = append(nKeys, argv2+item)
|
||||
}
|
||||
|
||||
// 1. get group info
|
||||
res, err := groupCmd.Run(context.Background(), r.client, nKeys, argv...).Result()
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
// 2. get retry or scheduled
|
||||
gInfo, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
retryIDs := map[string]string{}
|
||||
scheduleIDs := map[string]string{}
|
||||
for i, item := range gInfo {
|
||||
taskGroup, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if taskGroup != "" {
|
||||
scheduleIDs[IDs[i]] = taskGroup
|
||||
} else {
|
||||
retryIDs[IDs[i]] = taskGroup
|
||||
}
|
||||
}
|
||||
|
||||
updateSize := 0
|
||||
if len(retryIDs) > 0 {
|
||||
nKeys := []string{keys[1], keys[0]}
|
||||
rIDs := []string{}
|
||||
for k := range retryIDs {
|
||||
nKeys = append(nKeys, argv2+k)
|
||||
rIDs = append(rIDs, k)
|
||||
}
|
||||
retryCmd := getRetryForwardTaskCmd(rIDs, argv3)
|
||||
_, err := retryCmd.Run(context.Background(), r.client, nKeys, argv...).Result()
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
updateSize += len(retryIDs)
|
||||
}
|
||||
|
||||
if len(scheduleIDs) > 0 {
|
||||
nKeys := []string{keys[0]}
|
||||
sIDs := []string{}
|
||||
for k, v := range scheduleIDs {
|
||||
nKeys = append(nKeys, argv4+v)
|
||||
nKeys = append(nKeys, argv2+k)
|
||||
sIDs = append(sIDs, k)
|
||||
}
|
||||
sCmd := getScheduledForwardTaskCmd(sIDs, argv1)
|
||||
_, err := sCmd.Run(context.Background(), r.client, nKeys, argv...).Result()
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
updateSize += len(scheduleIDs)
|
||||
}
|
||||
return updateSize, nil
|
||||
}
|
||||
|
||||
// forward moves tasks with a score less than the current unix time from the delayed (i.e. scheduled | retry) zset
|
||||
// to the pending list or group set.
|
||||
// It returns the number of tasks moved.
|
||||
@ -953,15 +1144,49 @@ func (r *RDB) forward(delayedKey, pendingKey, taskKeyPrefix, groupKeyPrefix stri
|
||||
now.UnixNano(),
|
||||
groupKeyPrefix,
|
||||
}
|
||||
res, err := forwardCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
res, err := getForwardTasksCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
n, err := cast.ToIntE(res)
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return 0, nil
|
||||
}
|
||||
return n, nil
|
||||
|
||||
IDs := []string{}
|
||||
updateSize := 0
|
||||
|
||||
for _, item := range taskIDs {
|
||||
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs = append(IDs, taskID)
|
||||
|
||||
if len(IDs) == 100 {
|
||||
uCount, err := r.doForward(IDs, keys, argv)
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
updateSize += uCount
|
||||
IDs = []string{}
|
||||
}
|
||||
}
|
||||
|
||||
if len(IDs) > 0 {
|
||||
uCount, err := r.doForward(IDs, keys, argv)
|
||||
if err != nil {
|
||||
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
updateSize += uCount
|
||||
}
|
||||
|
||||
return updateSize, nil
|
||||
}
|
||||
|
||||
// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates
|
||||
@ -1138,17 +1363,75 @@ end
|
||||
return msgs
|
||||
`)
|
||||
|
||||
var getAggregationSetTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1)`)
|
||||
|
||||
func getAggregationMsgCmd(taskIDs []string) *redis.Script {
|
||||
|
||||
cmd := ""
|
||||
|
||||
cmd += fmt.Sprintf(`local msgs = {} `)
|
||||
|
||||
for i := range taskIDs {
|
||||
hGetCmd := fmt.Sprintf(`table.insert(msgs, redis.call("HGET", KEYS[%d], "msg")) `, i+1)
|
||||
cmd += hGetCmd
|
||||
}
|
||||
|
||||
cmd += fmt.Sprintf(`return msgs `)
|
||||
return redis.NewScript(cmd)
|
||||
}
|
||||
|
||||
// ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and
|
||||
// the deadline for aggregating those tasks.
|
||||
func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error) {
|
||||
var op errors.Op = "RDB.ReadAggregationSet"
|
||||
ctx := context.Background()
|
||||
aggSetKey := base.AggregationSetKey(qname, gname, setID)
|
||||
res, err := readAggregationSetCmd.Run(ctx, r.client,
|
||||
[]string{aggSetKey}, base.TaskKeyPrefix(qname)).Result()
|
||||
keys := []string{aggSetKey}
|
||||
argv := []interface{}{base.TaskKeyPrefix(qname)}
|
||||
// 1. get taskIDs
|
||||
res, err := getAggregationSetTasksCmd.Run(ctx, r.client, keys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return nil, time.Time{}, nil
|
||||
}
|
||||
|
||||
IDs := []string{}
|
||||
|
||||
for _, item := range taskIDs {
|
||||
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs = append(IDs, taskID)
|
||||
}
|
||||
|
||||
if len(IDs) == 0 {
|
||||
return nil, time.Time{}, nil
|
||||
}
|
||||
|
||||
argv1, _ := argv[0].(string)
|
||||
|
||||
// 2. get msg
|
||||
msgCmd := getAggregationMsgCmd(IDs)
|
||||
|
||||
nKeys := []string{}
|
||||
for _, taskID := range IDs {
|
||||
nKeys = append(nKeys, argv1+taskID)
|
||||
}
|
||||
|
||||
res, err = msgCmd.Run(ctx, r.client, nKeys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
data, err := cast.ToStringSliceE(res)
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))
|
||||
@ -1189,6 +1472,28 @@ redis.call("ZREM", KEYS[2], KEYS[1])
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
|
||||
var getAggregationDeleteTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1)`)
|
||||
|
||||
func getDeleteAggregationCmd(taskIDs []string) *redis.Script {
|
||||
|
||||
cmd := ""
|
||||
|
||||
for i := range taskIDs {
|
||||
delCmd := fmt.Sprintf(`redis.call("DEL", KEYS[%d]) `, i+3)
|
||||
cmd += delCmd
|
||||
}
|
||||
|
||||
delCmd := fmt.Sprintf(`redis.call("DEL", KEYS[1]) `)
|
||||
cmd += delCmd
|
||||
|
||||
zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[2], KEYS[1]) `)
|
||||
cmd += zRemCmd
|
||||
|
||||
cmd += fmt.Sprintf(`return redis.status_reply("OK") `)
|
||||
|
||||
return redis.NewScript(cmd)
|
||||
}
|
||||
|
||||
// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.
|
||||
func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error {
|
||||
var op errors.Op = "RDB.DeleteAggregationSet"
|
||||
@ -1196,7 +1501,43 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri
|
||||
base.AggregationSetKey(qname, gname, setID),
|
||||
base.AllAggregationSets(qname),
|
||||
}
|
||||
return r.runScript(ctx, op, deleteAggregationSetCmd, keys, base.TaskKeyPrefix(qname))
|
||||
|
||||
// 1. get taskIDs
|
||||
res, err := getAggregationDeleteTasksCmd.Run(ctx, r.client, keys).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("redis eval error: %v", err)
|
||||
}
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
IDs := []string{}
|
||||
|
||||
for _, item := range taskIDs {
|
||||
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs = append(IDs, taskID)
|
||||
}
|
||||
|
||||
if len(IDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
delCmd := getDeleteAggregationCmd(IDs)
|
||||
|
||||
nKeys := []string{keys[0], keys[1]}
|
||||
argv1 := base.TaskKeyPrefix(qname)
|
||||
for _, taskID := range IDs {
|
||||
nKeys = append(nKeys, argv1+taskID)
|
||||
}
|
||||
|
||||
return r.runScript(ctx, op, delCmd, nKeys, base.TaskKeyPrefix(qname))
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:aggregation_sets
|
||||
@ -1217,12 +1558,141 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
|
||||
var getReClaimStateKeysCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])`)
|
||||
|
||||
var getReclaimStateTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1, "WITHSCORES") `)
|
||||
|
||||
var getDeleteReClaimStateKeyCmd = redis.NewScript(`return redis.call("DEL", KEYS[1])`)
|
||||
|
||||
func getUpdateReClaimGroupCmd(taskIDs []string, indexes []int64) *redis.Script {
|
||||
cmd := ""
|
||||
for i, taskID := range taskIDs {
|
||||
zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[1], %d, "%s") `, indexes[i], taskID)
|
||||
cmd += zAddCmd
|
||||
}
|
||||
cmd += fmt.Sprintf(`return '1' `)
|
||||
return redis.NewScript(cmd)
|
||||
}
|
||||
|
||||
func getDeleteReClaimStateKeysCmd(argv1 int64) *redis.Script {
|
||||
cmd := ""
|
||||
delCmd := fmt.Sprintf(`redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", %d) `, argv1)
|
||||
cmd += delCmd
|
||||
cmd += fmt.Sprintf(`return redis.status_reply("OK") `)
|
||||
return redis.NewScript(cmd)
|
||||
}
|
||||
|
||||
func findKey(key string) int {
|
||||
re := regexp.MustCompile(`:[^:]*$`)
|
||||
match := re.FindStringIndex(key)
|
||||
if match != nil {
|
||||
return match[0]
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func subKey(key string, idx int) string {
|
||||
if idx > 0 && idx <= len(key) {
|
||||
return key[:idx]
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and
|
||||
// reclaim tasks in the stale aggregation set by putting them back in the group.
|
||||
func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
|
||||
var op errors.Op = "RDB.ReclaimStaleAggregationSets"
|
||||
return r.runScript(context.Background(), op, reclaimStateAggregationSetsCmd,
|
||||
[]string{base.AllAggregationSets(qname)}, r.clock.Now().Unix())
|
||||
ctx := context.Background()
|
||||
keys := []string{base.AllAggregationSets(qname)}
|
||||
argv := []interface{}{r.clock.Now().Unix()}
|
||||
// 1. get stale keys
|
||||
res, err := getReClaimStateKeysCmd.Run(ctx, r.client, keys, argv...).Result()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
sKeys, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
cKeys := []string{}
|
||||
|
||||
for _, item := range sKeys {
|
||||
|
||||
cKey, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
cKeys = append(cKeys, cKey)
|
||||
}
|
||||
|
||||
for _, group := range cKeys {
|
||||
// 1. get taskIDs
|
||||
nKeys := []string{group}
|
||||
res, err = getReclaimStateTasksCmd.Run(ctx, r.client, nKeys).Result()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs := []string{}
|
||||
indexes := []int64{}
|
||||
|
||||
for i, item := range taskIDs {
|
||||
if i%2 == 0 {
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
IDs = append(IDs, taskID)
|
||||
} else {
|
||||
index, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
num, err := strconv.ParseInt(index, 10, 64)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
indexes = append(indexes, num)
|
||||
}
|
||||
}
|
||||
|
||||
if len(IDs) != len(indexes) {
|
||||
continue
|
||||
}
|
||||
|
||||
// 2. update tasks
|
||||
updateCmd := getUpdateReClaimGroupCmd(IDs, indexes)
|
||||
|
||||
nKeys = []string{}
|
||||
idx := findKey(group)
|
||||
groupKey := subKey(group, idx)
|
||||
nKeys = append(nKeys, groupKey)
|
||||
|
||||
res, err = updateCmd.Run(ctx, r.client, nKeys).Result()
|
||||
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
// 3. delete tasks
|
||||
nKeys = []string{group}
|
||||
res, err = getDeleteReClaimStateKeyCmd.Run(ctx, r.client, nKeys).Result()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
// delete reclaim groups
|
||||
argv1, _ := argv[0].(int64)
|
||||
delCmd := getDeleteReClaimStateKeysCmd(argv1)
|
||||
return r.runScript(context.Background(), op, delCmd, keys, argv...)
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:completed
|
||||
@ -1239,6 +1709,9 @@ for _, id in ipairs(ids) do
|
||||
end
|
||||
return table.getn(ids)`)
|
||||
|
||||
var getExpiredCompletedTasksCmd = redis.NewScript(`
|
||||
return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3]))`)
|
||||
|
||||
// DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set,
|
||||
// and delete all expired tasks.
|
||||
func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
|
||||
@ -1255,6 +1728,48 @@ func (r *RDB) DeleteExpiredCompletedTasks(qname string) error {
|
||||
}
|
||||
}
|
||||
|
||||
func getDeleteExpiredCompletedTasksCmd(taskIDs []string) *redis.Script {
|
||||
delStrs := []string{}
|
||||
for i := range taskIDs {
|
||||
delStrs = append(delStrs, fmt.Sprintf("KEYS[%d]", i+2))
|
||||
}
|
||||
cmd := ""
|
||||
|
||||
// redis.call("DEL", ARGV[2] .. id)
|
||||
delCmd := fmt.Sprintf(`redis.call("DEL", %s) `, strings.Join(delStrs, ","))
|
||||
|
||||
cmd += delCmd
|
||||
|
||||
// redis.call("ZREM", KEYS[1], id)
|
||||
for _, taskID := range taskIDs {
|
||||
zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[1], "%s") `, taskID)
|
||||
cmd += zRemCmd
|
||||
}
|
||||
|
||||
cmd += "return 0"
|
||||
|
||||
return redis.NewScript(cmd)
|
||||
|
||||
}
|
||||
|
||||
func (r *RDB) doDeleteExpireTask(IDs []string, keys []string, argv []interface{}) (int64, error) {
|
||||
var op errors.Op = "rdb.DeleteExpiredCompletedTasks"
|
||||
argv2, _ := argv[1].(string)
|
||||
delSize := 0
|
||||
delCmd := getDeleteExpiredCompletedTasksCmd(IDs)
|
||||
nKeys := []string{}
|
||||
nKeys = append(nKeys, keys[0])
|
||||
for _, item := range IDs {
|
||||
nKeys = append(nKeys, argv2+item)
|
||||
}
|
||||
_, err := delCmd.Run(context.Background(), r.client, nKeys).Result()
|
||||
if err != nil {
|
||||
return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
delSize += len(IDs)
|
||||
return int64(delSize), nil
|
||||
}
|
||||
|
||||
// deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified
|
||||
// batch size. It reports the number of tasks deleted.
|
||||
func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, error) {
|
||||
@ -1265,15 +1780,50 @@ func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, e
|
||||
base.TaskKeyPrefix(qname),
|
||||
batchSize,
|
||||
}
|
||||
res, err := deleteExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
res, err := getExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
n, ok := res.(int64)
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res))
|
||||
return 0, nil
|
||||
}
|
||||
return n, nil
|
||||
|
||||
// batch delete keys
|
||||
IDs := []string{}
|
||||
delSize := int64(0)
|
||||
|
||||
for _, item := range taskIDs {
|
||||
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs = append(IDs, taskID)
|
||||
|
||||
if len(IDs) == 100 {
|
||||
dSize, err := r.doDeleteExpireTask(IDs, keys, argv)
|
||||
if err != nil {
|
||||
return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
delSize += dSize
|
||||
IDs = []string{}
|
||||
}
|
||||
}
|
||||
|
||||
if len(IDs) > 0 {
|
||||
dSize, err := r.doDeleteExpireTask(IDs, keys, argv)
|
||||
if err != nil {
|
||||
return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
delSize += dSize
|
||||
}
|
||||
|
||||
return delSize, nil
|
||||
}
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:lease
|
||||
@ -1289,17 +1839,71 @@ end
|
||||
return res
|
||||
`)
|
||||
|
||||
var getLeaseExpiredTasksCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) `)
|
||||
|
||||
func getLeaseExpiredMsgCmd(taskIDs []string) *redis.Script {
|
||||
cmd := ""
|
||||
|
||||
cmd += fmt.Sprintf(`local res = {} `)
|
||||
for i := range taskIDs {
|
||||
hGetCmd := fmt.Sprintf(`table.insert(res, redis.call("HGET", KEYS[%d], "msg")) `, i+1)
|
||||
cmd += hGetCmd
|
||||
}
|
||||
|
||||
cmd += fmt.Sprintf(`return res `)
|
||||
return redis.NewScript(cmd)
|
||||
|
||||
}
|
||||
|
||||
// ListLeaseExpired returns a list of task messages with an expired lease from the given queues.
|
||||
func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
||||
var op errors.Op = "rdb.ListLeaseExpired"
|
||||
var msgs []*base.TaskMessage
|
||||
for _, qname := range qnames {
|
||||
res, err := listLeaseExpiredCmd.Run(context.Background(), r.client,
|
||||
[]string{base.LeaseKey(qname)},
|
||||
cutoff.Unix(), base.TaskKeyPrefix(qname)).Result()
|
||||
|
||||
keys := []string{base.LeaseKey(qname)}
|
||||
argv := []interface{}{cutoff.Unix(), base.TaskKeyPrefix(qname)}
|
||||
res, err := getLeaseExpiredTasksCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
taskIDs, ok := res.([]interface{})
|
||||
if !ok {
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
IDs := []string{}
|
||||
|
||||
for _, item := range taskIDs {
|
||||
|
||||
taskID, ok := item.(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
IDs = append(IDs, taskID)
|
||||
}
|
||||
|
||||
if len(IDs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
argv2, _ := argv[1].(string)
|
||||
|
||||
msgCmd := getLeaseExpiredMsgCmd(IDs)
|
||||
nKeys := []string{}
|
||||
|
||||
for _, taskID := range IDs {
|
||||
nKeys = append(nKeys, argv2+taskID)
|
||||
}
|
||||
res, err = msgCmd.Run(context.Background(), r.client, nKeys, argv...).Result()
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||
}
|
||||
|
||||
data, err := cast.ToStringSliceE(res)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))
|
||||
|
Loading…
x
Reference in New Issue
Block a user