2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Implement RDB.ReclaimStaleAggregationSets

This commit is contained in:
Ken Hibino 2022-03-11 12:04:35 -08:00
parent 47a66231b3
commit 1c388baf06
2 changed files with 34 additions and 4 deletions

View File

@ -1155,15 +1155,30 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri
return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)) return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname))
} }
// KEYS[1] -> asynq:{<qname>}:aggregation_sets
// -------
// ARGV[1] -> current time in unix time
var reclaimStateAggregationSetsCmd = redis.NewScript(` var reclaimStateAggregationSetsCmd = redis.NewScript(`
local staleSetKeys = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, key in ipairs(staleSetKeys) do
local idx = string.find(key, ":[^:]*$")
local groupKey = string.sub(key, 1, idx-1)
local res = redis.call("ZRANGE", key, 0, -1, "WITHSCORES")
for i=1, table.getn(res)-1, 2 do
redis.call("ZADD", groupKey, tonumber(res[i+1]), res[i])
end
redis.call("DEL", key)
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
return redis.status_reply("OK")
`) `)
// ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and
// reclaim tasks in the stale aggregation set by putting them back in the group. // reclaim tasks in the stale aggregation set by putting them back in the group.
func (r *RDB) ReclaimStaleAggregationSets(qname string) error { func (r *RDB) ReclaimStaleAggregationSets(qname string) error {
//now := r.clock.Now() var op errors.Op = "RDB.ReclaimStaleAggregationSets"
return nil return r.runScript(context.Background(), op, reclaimStateAggregationSetsCmd,
[]string{base.AllAggregationSets(qname)}, r.clock.Now().Unix())
} }
// KEYS[1] -> asynq:{<qname>}:completed // KEYS[1] -> asynq:{<qname>}:completed

View File

@ -9,6 +9,7 @@ import (
"encoding/json" "encoding/json"
"flag" "flag"
"math" "math"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -3513,12 +3514,26 @@ func AssertZSets(t *testing.T, r redis.UniversalClient, wantZSets map[string][]r
if err != nil { if err != nil {
t.Fatalf("Failed to read zset (key=%q): %v", key, err) t.Fatalf("Failed to read zset (key=%q): %v", key, err)
} }
if diff := cmp.Diff(want, got); diff != "" { if diff := cmp.Diff(want, got, SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in zset (key=%q): (-want,+got)\n%s", key, diff) t.Errorf("mismatch found in zset (key=%q): (-want,+got)\n%s", key, diff)
} }
} }
} }
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []redis.Z) []redis.Z {
out := append([]redis.Z(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
// TODO: If member is a comparable type (int, string, etc) compare by the member
// Use generic comparable type here once update to go1.18
if _, ok := out[i].Member.(string); ok {
// If member is a string, compare the member
return out[i].Member.(string) < out[j].Member.(string)
}
return out[i].Score < out[j].Score
})
return out
})
func TestListGroups(t *testing.T) { func TestListGroups(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()