diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f3875d5..e31c1d6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1155,15 +1155,30 @@ func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID stri return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)) } +// KEYS[1] -> asynq:{}:aggregation_sets +// ------- +// ARGV[1] -> current time in unix time var reclaimStateAggregationSetsCmd = redis.NewScript(` - +local staleSetKeys = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +for _, key in ipairs(staleSetKeys) do + local idx = string.find(key, ":[^:]*$") + local groupKey = string.sub(key, 1, idx-1) + local res = redis.call("ZRANGE", key, 0, -1, "WITHSCORES") + for i=1, table.getn(res)-1, 2 do + redis.call("ZADD", groupKey, tonumber(res[i+1]), res[i]) + end + redis.call("DEL", key) +end +redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +return redis.status_reply("OK") `) // ReclaimStateAggregationSets checks for any stale aggregation sets in the given queue, and // reclaim tasks in the stale aggregation set by putting them back in the group. func (r *RDB) ReclaimStaleAggregationSets(qname string) error { - //now := r.clock.Now() - return nil + var op errors.Op = "RDB.ReclaimStaleAggregationSets" + return r.runScript(context.Background(), op, reclaimStateAggregationSetsCmd, + []string{base.AllAggregationSets(qname)}, r.clock.Now().Unix()) } // KEYS[1] -> asynq:{}:completed diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index d5ceaed..f9c72d3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "flag" "math" + "sort" "strconv" "strings" "sync" @@ -3513,12 +3514,26 @@ func AssertZSets(t *testing.T, r redis.UniversalClient, wantZSets map[string][]r if err != nil { t.Fatalf("Failed to read zset (key=%q): %v", key, err) } - if diff := cmp.Diff(want, got); diff != "" { + if diff := cmp.Diff(want, got, SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in zset (key=%q): (-want,+got)\n%s", key, diff) } } } +var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []redis.Z) []redis.Z { + out := append([]redis.Z(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + // TODO: If member is a comparable type (int, string, etc) compare by the member + // Use generic comparable type here once update to go1.18 + if _, ok := out[i].Member.(string); ok { + // If member is a string, compare the member + return out[i].Member.(string) < out[j].Member.(string) + } + return out[i].Score < out[j].Score + }) + return out +}) + func TestListGroups(t *testing.T) { r := setup(t) defer r.Close()