From 476b69d43f0bc49ae8710b5ed4fdf0d2ce78cba4 Mon Sep 17 00:00:00 2001 From: yz <1126978661@qq.com> Date: Wed, 6 Sep 2023 15:01:14 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A5=20update=20rdb,=20support=20proxy?= =?UTF-8?q?=20redis=20cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/rdb/rdb.go | 656 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 630 insertions(+), 26 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c79a9c5..0a7cd2e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,6 +9,9 @@ import ( "context" "fmt" "math" + "regexp" + "strconv" + "strings" "time" "github.com/google/uuid" @@ -216,19 +219,45 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time // // Note: dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. -var dequeueCmd = redis.NewScript(` +// var dequeueCmd = redis.NewScript(` +// if redis.call("EXISTS", KEYS[2]) == 0 then +// local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) +// if id then +// local key = ARGV[2] .. id +// redis.call("HSET", key, "state", "active") +// redis.call("HDEL", key, "pending_since") +// redis.call("ZADD", KEYS[4], ARGV[1], id) +// return redis.call("HGET", key, "msg") +// end +// end +// return nil`) + +var getDequeueTaskIDCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) - if id then - local key = ARGV[2] .. id - redis.call("HSET", key, "state", "active") - redis.call("HDEL", key, "pending_since") - redis.call("ZADD", KEYS[4], ARGV[1], id) - return redis.call("HGET", key, "msg") - end + return id end return nil`) +func getUpdateDequeueCmd(taskID string) *redis.Script { + + cmd := "" + + hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[1], "state", "active") `) + cmd += hSetCmd + + hDelCmd := fmt.Sprintf(`redis.call("HDEL", KEYS[1], "pending_since") `) + cmd += hDelCmd + + zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[5], ARGV[1], "%s") `, taskID) + cmd += zAddCmd + + cmd += fmt.Sprintf(`return redis.call("HGET", KEYS[1], "msg")`) + + return redis.NewScript(cmd) + +} + // Dequeue queries given queues in order and pops a task message // off a queue if one exists and returns the message and its lease expiration time. // Dequeue skips a queue if the queue is paused. @@ -247,12 +276,39 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationT leaseExpirationTime.Unix(), base.TaskKeyPrefix(qname), } - res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result() + + tRes, err := getDequeueTaskIDCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err == redis.Nil { continue } else if err != nil { return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } + + // if id exists + taskID, ok := tRes.(string) + if !ok { + continue + } + + argv2 := base.TaskKeyPrefix(qname) + updateCmd := getUpdateDequeueCmd(taskID) + + nKeys := []string{} + + nKeys = append(nKeys, argv2+taskID) + for _, item := range keys { + nKeys = append(nKeys, item) + } + + res, err := updateCmd.Run(context.Background(), r.client, nKeys, argv...).Result() + + if err == redis.Nil { + continue + } else if err != nil { + return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + encoded, err := cast.ToStringE(res) if err != nil { return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) @@ -941,6 +997,141 @@ for _, id in ipairs(ids) do end return table.getn(ids)`) +// forward (1) get taskIDs +var getForwardTasksCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)`) + +// forward (2) get group info +func getForwardTaskGroupsCmd(taskIDs []string) *redis.Script { + cmd := "" + gTableCmd := fmt.Sprintf("local gTable = {} ") + + cmd += gTableCmd + + for i := range taskIDs { + groupCmd := fmt.Sprintf( + `local group = redis.call("HGET", KEYS[%d], "group") if group == nil then group = '' end gTable[%d] = group `, + i+1, i+1) + cmd += groupCmd + } + + cmd += fmt.Sprintf("return gTable ") + return redis.NewScript(cmd) +} + +// forward (4) scheduled +func getScheduledForwardTaskCmd(taskIDs []string, argv1 int64) *redis.Script { + + cmd := "" + + for i, taskID := range taskIDs { + zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[%d], %d, "%s") `, (i+1)*2, argv1, taskID) + cmd += zAddCmd + + zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[1], "%s") `, taskID) + cmd += zRemCmd + + hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[%d], "state", "aggregating") `, (i+1)*2+1) + cmd += hSetCmd + } + + cmd += "return '1'" + return redis.NewScript(cmd) + +} + +// forward (3) retry +func getRetryForwardTaskCmd(taskIDs []string, argv3 int64) *redis.Script { + + cmd := "" + + for i, taskID := range taskIDs { + lPushCmd := fmt.Sprintf(`redis.call("LPUSH", KEYS[1], "%s") `, taskID) + cmd += lPushCmd + + zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[2], "%s") `, taskID) + cmd += zRemCmd + + hSetCmd := fmt.Sprintf(`redis.call("HSET", KEYS[%d], "state", "pending", "pending_since", "%s") `, i+3, fmt.Sprintf("%d", argv3)) + cmd += hSetCmd + } + + cmd += "return '1'" + return redis.NewScript(cmd) + +} + +func (r *RDB) doForward(IDs []string, keys []string, argv []interface{}) (int, error) { + + argv1, _ := argv[0].(int64) + argv2, _ := argv[1].(string) + argv3, _ := argv[2].(int64) + argv4, _ := argv[3].(string) + groupCmd := getForwardTaskGroupsCmd(IDs) + nKeys := []string{} + for _, item := range IDs { + nKeys = append(nKeys, argv2+item) + } + + // 1. get group info + res, err := groupCmd.Run(context.Background(), r.client, nKeys, argv...).Result() + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + + // 2. get retry or scheduled + gInfo, ok := res.([]interface{}) + if !ok { + return 0, nil + } + + retryIDs := map[string]string{} + scheduleIDs := map[string]string{} + for i, item := range gInfo { + taskGroup, ok := item.(string) + if !ok { + continue + } + if taskGroup != "" { + scheduleIDs[IDs[i]] = taskGroup + } else { + retryIDs[IDs[i]] = taskGroup + } + } + + updateSize := 0 + if len(retryIDs) > 0 { + nKeys := []string{keys[1], keys[0]} + rIDs := []string{} + for k := range retryIDs { + nKeys = append(nKeys, argv2+k) + rIDs = append(rIDs, k) + } + retryCmd := getRetryForwardTaskCmd(rIDs, argv3) + _, err := retryCmd.Run(context.Background(), r.client, nKeys, argv...).Result() + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + updateSize += len(retryIDs) + } + + if len(scheduleIDs) > 0 { + nKeys := []string{keys[0]} + sIDs := []string{} + for k, v := range scheduleIDs { + nKeys = append(nKeys, argv4+v) + nKeys = append(nKeys, argv2+k) + sIDs = append(sIDs, k) + } + sCmd := getScheduledForwardTaskCmd(sIDs, argv1) + _, err := sCmd.Run(context.Background(), r.client, nKeys, argv...).Result() + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + updateSize += len(scheduleIDs) + } + return updateSize, nil +} + // forward moves tasks with a score less than the current unix time from the delayed (i.e. scheduled | retry) zset // to the pending list or group set. // It returns the number of tasks moved. @@ -953,15 +1144,49 @@ func (r *RDB) forward(delayedKey, pendingKey, taskKeyPrefix, groupKeyPrefix stri now.UnixNano(), groupKeyPrefix, } - res, err := forwardCmd.Run(context.Background(), r.client, keys, argv...).Result() + + res, err := getForwardTasksCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err != nil { return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } - n, err := cast.ToIntE(res) - if err != nil { - return 0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res)) + + taskIDs, ok := res.([]interface{}) + if !ok { + return 0, nil } - return n, nil + + IDs := []string{} + updateSize := 0 + + for _, item := range taskIDs { + + taskID, ok := item.(string) + if !ok { + continue + } + + IDs = append(IDs, taskID) + + if len(IDs) == 100 { + uCount, err := r.doForward(IDs, keys, argv) + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + updateSize += uCount + IDs = []string{} + } + } + + if len(IDs) > 0 { + uCount, err := r.doForward(IDs, keys, argv) + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + updateSize += uCount + } + + return updateSize, nil } // forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates @@ -1138,17 +1363,75 @@ end return msgs `) +var getAggregationSetTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1)`) + +func getAggregationMsgCmd(taskIDs []string) *redis.Script { + + cmd := "" + + cmd += fmt.Sprintf(`local msgs = {} `) + + for i := range taskIDs { + hGetCmd := fmt.Sprintf(`table.insert(msgs, redis.call("HGET", KEYS[%d], "msg")) `, i+1) + cmd += hGetCmd + } + + cmd += fmt.Sprintf(`return msgs `) + return redis.NewScript(cmd) +} + // ReadAggregationSet retrieves members of an aggregation set and returns a list of tasks in the set and // the deadline for aggregating those tasks. func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessage, time.Time, error) { var op errors.Op = "RDB.ReadAggregationSet" ctx := context.Background() aggSetKey := base.AggregationSetKey(qname, gname, setID) - res, err := readAggregationSetCmd.Run(ctx, r.client, - []string{aggSetKey}, base.TaskKeyPrefix(qname)).Result() + keys := []string{aggSetKey} + argv := []interface{}{base.TaskKeyPrefix(qname)} + // 1. get taskIDs + res, err := getAggregationSetTasksCmd.Run(ctx, r.client, keys, argv...).Result() + if err != nil { return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } + + taskIDs, ok := res.([]interface{}) + if !ok { + return nil, time.Time{}, nil + } + + IDs := []string{} + + for _, item := range taskIDs { + + taskID, ok := item.(string) + if !ok { + continue + } + + IDs = append(IDs, taskID) + } + + if len(IDs) == 0 { + return nil, time.Time{}, nil + } + + argv1, _ := argv[0].(string) + + // 2. get msg + msgCmd := getAggregationMsgCmd(IDs) + + nKeys := []string{} + for _, taskID := range IDs { + nKeys = append(nKeys, argv1+taskID) + } + + res, err = msgCmd.Run(ctx, r.client, nKeys, argv...).Result() + + if err != nil { + return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + data, err := cast.ToStringSliceE(res) if err != nil { return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res)) @@ -1189,6 +1472,28 @@ redis.call("ZREM", KEYS[2], KEYS[1]) return redis.status_reply("OK") `) +var getAggregationDeleteTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1)`) + +func getDeleteAggregationCmd(taskIDs []string) *redis.Script { + + cmd := "" + + for i := range taskIDs { + delCmd := fmt.Sprintf(`redis.call("DEL", KEYS[%d]) `, i+3) + cmd += delCmd + } + + delCmd := fmt.Sprintf(`redis.call("DEL", KEYS[1]) `) + cmd += delCmd + + zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[2], KEYS[1]) `) + cmd += zRemCmd + + cmd += fmt.Sprintf(`return redis.status_reply("OK") `) + + return redis.NewScript(cmd) +} + // DeleteAggregationSet deletes the aggregation set and its members identified by the parameters. func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error { var op errors.Op = "RDB.DeleteAggregationSet" @@ -1196,7 +1501,43 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri base.AggregationSetKey(qname, gname, setID), base.AllAggregationSets(qname), } - return r.runScript(ctx, op, deleteAggregationSetCmd, keys, base.TaskKeyPrefix(qname)) + + // 1. get taskIDs + res, err := getAggregationDeleteTasksCmd.Run(ctx, r.client, keys).Result() + if err != nil { + return fmt.Errorf("redis eval error: %v", err) + } + + taskIDs, ok := res.([]interface{}) + if !ok { + return nil + } + + IDs := []string{} + + for _, item := range taskIDs { + + taskID, ok := item.(string) + if !ok { + continue + } + + IDs = append(IDs, taskID) + } + + if len(IDs) == 0 { + return nil + } + + delCmd := getDeleteAggregationCmd(IDs) + + nKeys := []string{keys[0], keys[1]} + argv1 := base.TaskKeyPrefix(qname) + for _, taskID := range IDs { + nKeys = append(nKeys, argv1+taskID) + } + + return r.runScript(ctx, op, delCmd, nKeys, base.TaskKeyPrefix(qname)) } // KEYS[1] -> asynq:{}:aggregation_sets @@ -1217,12 +1558,141 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) return redis.status_reply("OK") `) +var getReClaimStateKeysCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])`) + +var getReclaimStateTasksCmd = redis.NewScript(`return redis.call("ZRANGE", KEYS[1], 0, -1, "WITHSCORES") `) + +var getDeleteReClaimStateKeyCmd = redis.NewScript(`return redis.call("DEL", KEYS[1])`) + +func getUpdateReClaimGroupCmd(taskIDs []string, indexes []int64) *redis.Script { + cmd := "" + for i, taskID := range taskIDs { + zAddCmd := fmt.Sprintf(`redis.call("ZADD", KEYS[1], %d, "%s") `, indexes[i], taskID) + cmd += zAddCmd + } + cmd += fmt.Sprintf(`return '1' `) + return redis.NewScript(cmd) +} + +func getDeleteReClaimStateKeysCmd(argv1 int64) *redis.Script { + cmd := "" + delCmd := fmt.Sprintf(`redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", %d) `, argv1) + cmd += delCmd + cmd += fmt.Sprintf(`return redis.status_reply("OK") `) + return redis.NewScript(cmd) +} + +func findKey(key string) int { + re := regexp.MustCompile(`:[^:]*$`) + match := re.FindStringIndex(key) + if match != nil { + return match[0] + } + return -1 +} + +func subKey(key string, idx int) string { + if idx > 0 && idx <= len(key) { + return key[:idx] + } + return "" +} + // ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // reclaim tasks in the stale aggregation set by putting them back in the group. func (r *RDB) ReclaimStaleAggregationSets(qname string) error { var op errors.Op = "RDB.ReclaimStaleAggregationSets" - return r.runScript(context.Background(), op, reclaimStateAggregationSetsCmd, - []string{base.AllAggregationSets(qname)}, r.clock.Now().Unix()) + ctx := context.Background() + keys := []string{base.AllAggregationSets(qname)} + argv := []interface{}{r.clock.Now().Unix()} + // 1. get stale keys + res, err := getReClaimStateKeysCmd.Run(ctx, r.client, keys, argv...).Result() + if err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + + sKeys, ok := res.([]interface{}) + if !ok { + return nil + } + + cKeys := []string{} + + for _, item := range sKeys { + + cKey, ok := item.(string) + if !ok { + continue + } + cKeys = append(cKeys, cKey) + } + + for _, group := range cKeys { + // 1. get taskIDs + nKeys := []string{group} + res, err = getReclaimStateTasksCmd.Run(ctx, r.client, nKeys).Result() + if err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + + taskIDs, ok := res.([]interface{}) + if !ok { + continue + } + + IDs := []string{} + indexes := []int64{} + + for i, item := range taskIDs { + if i%2 == 0 { + taskID, ok := item.(string) + if !ok { + continue + } + IDs = append(IDs, taskID) + } else { + index, ok := item.(string) + if !ok { + continue + } + num, err := strconv.ParseInt(index, 10, 64) + if err != nil { + continue + } + indexes = append(indexes, num) + } + } + + if len(IDs) != len(indexes) { + continue + } + + // 2. update tasks + updateCmd := getUpdateReClaimGroupCmd(IDs, indexes) + + nKeys = []string{} + idx := findKey(group) + groupKey := subKey(group, idx) + nKeys = append(nKeys, groupKey) + + res, err = updateCmd.Run(ctx, r.client, nKeys).Result() + + if err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + + // 3. delete tasks + nKeys = []string{group} + res, err = getDeleteReClaimStateKeyCmd.Run(ctx, r.client, nKeys).Result() + if err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) + } + } + + // delete reclaim groups + argv1, _ := argv[0].(int64) + delCmd := getDeleteReClaimStateKeysCmd(argv1) + return r.runScript(context.Background(), op, delCmd, keys, argv...) } // KEYS[1] -> asynq:{}:completed @@ -1239,6 +1709,9 @@ for _, id in ipairs(ids) do end return table.getn(ids)`) +var getExpiredCompletedTasksCmd = redis.NewScript(` +return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, tonumber(ARGV[3]))`) + // DeleteExpiredCompletedTasks checks for any expired tasks in the given queue's completed set, // and delete all expired tasks. func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { @@ -1255,6 +1728,48 @@ func (r *RDB) DeleteExpiredCompletedTasks(qname string) error { } } +func getDeleteExpiredCompletedTasksCmd(taskIDs []string) *redis.Script { + delStrs := []string{} + for i := range taskIDs { + delStrs = append(delStrs, fmt.Sprintf("KEYS[%d]", i+2)) + } + cmd := "" + + // redis.call("DEL", ARGV[2] .. id) + delCmd := fmt.Sprintf(`redis.call("DEL", %s) `, strings.Join(delStrs, ",")) + + cmd += delCmd + + // redis.call("ZREM", KEYS[1], id) + for _, taskID := range taskIDs { + zRemCmd := fmt.Sprintf(`redis.call("ZREM", KEYS[1], "%s") `, taskID) + cmd += zRemCmd + } + + cmd += "return 0" + + return redis.NewScript(cmd) + +} + +func (r *RDB) doDeleteExpireTask(IDs []string, keys []string, argv []interface{}) (int64, error) { + var op errors.Op = "rdb.DeleteExpiredCompletedTasks" + argv2, _ := argv[1].(string) + delSize := 0 + delCmd := getDeleteExpiredCompletedTasksCmd(IDs) + nKeys := []string{} + nKeys = append(nKeys, keys[0]) + for _, item := range IDs { + nKeys = append(nKeys, argv2+item) + } + _, err := delCmd.Run(context.Background(), r.client, nKeys).Result() + if err != nil { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + delSize += len(IDs) + return int64(delSize), nil +} + // deleteExpiredCompletedTasks runs the lua script to delete expired deleted task with the specified // batch size. It reports the number of tasks deleted. func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, error) { @@ -1265,15 +1780,50 @@ func (r *RDB) deleteExpiredCompletedTasks(qname string, batchSize int) (int64, e base.TaskKeyPrefix(qname), batchSize, } - res, err := deleteExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result() + + res, err := getExpiredCompletedTasksCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err != nil { return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } - n, ok := res.(int64) + + taskIDs, ok := res.([]interface{}) if !ok { - return 0, errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res)) + return 0, nil } - return n, nil + + // batch delete keys + IDs := []string{} + delSize := int64(0) + + for _, item := range taskIDs { + + taskID, ok := item.(string) + if !ok { + continue + } + + IDs = append(IDs, taskID) + + if len(IDs) == 100 { + dSize, err := r.doDeleteExpireTask(IDs, keys, argv) + if err != nil { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + delSize += dSize + IDs = []string{} + } + } + + if len(IDs) > 0 { + dSize, err := r.doDeleteExpireTask(IDs, keys, argv) + if err != nil { + return 0, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + delSize += dSize + } + + return delSize, nil } // KEYS[1] -> asynq:{}:lease @@ -1289,17 +1839,71 @@ end return res `) +var getLeaseExpiredTasksCmd = redis.NewScript(`return redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) `) + +func getLeaseExpiredMsgCmd(taskIDs []string) *redis.Script { + cmd := "" + + cmd += fmt.Sprintf(`local res = {} `) + for i := range taskIDs { + hGetCmd := fmt.Sprintf(`table.insert(res, redis.call("HGET", KEYS[%d], "msg")) `, i+1) + cmd += hGetCmd + } + + cmd += fmt.Sprintf(`return res `) + return redis.NewScript(cmd) + +} + // ListLeaseExpired returns a list of task messages with an expired lease from the given queues. func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) { var op errors.Op = "rdb.ListLeaseExpired" var msgs []*base.TaskMessage for _, qname := range qnames { - res, err := listLeaseExpiredCmd.Run(context.Background(), r.client, - []string{base.LeaseKey(qname)}, - cutoff.Unix(), base.TaskKeyPrefix(qname)).Result() + + keys := []string{base.LeaseKey(qname)} + argv := []interface{}{cutoff.Unix(), base.TaskKeyPrefix(qname)} + res, err := getLeaseExpiredTasksCmd.Run(context.Background(), r.client, keys, argv...).Result() + if err != nil { return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } + + taskIDs, ok := res.([]interface{}) + if !ok { + return msgs, nil + } + + IDs := []string{} + + for _, item := range taskIDs { + + taskID, ok := item.(string) + if !ok { + continue + } + + IDs = append(IDs, taskID) + } + + if len(IDs) == 0 { + continue + } + + argv2, _ := argv[1].(string) + + msgCmd := getLeaseExpiredMsgCmd(IDs) + nKeys := []string{} + + for _, taskID := range IDs { + nKeys = append(nKeys, argv2+taskID) + } + res, err = msgCmd.Run(context.Background(), r.client, nKeys, argv...).Result() + + if err != nil { + return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + data, err := cast.ToStringSliceE(res) if err != nil { return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))