From e63f41fb24fd3ada433cc7bb204ca66869366db6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 11 Mar 2022 15:54:47 -0800 Subject: [PATCH] Fix DeleteAggregationSet --- internal/rdb/rdb.go | 8 +++++++- internal/rdb/rdb_test.go | 12 +++++++++++- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index e31c1d6..16c2ff8 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1138,6 +1138,7 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag } // KEYS[1] -> asynq:{}:g:: +// KEYS[2] -> asynq:{}:aggregation_sets // ------- // ARGV[1] -> task key prefix var deleteAggregationSetCmd = redis.NewScript(` @@ -1146,13 +1147,18 @@ for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) end redis.call("DEL", KEYS[1]) +redis.call("ZREM", KEYS[2], KEYS[1]) return redis.status_reply("OK") `) // DeleteAggregationSet deletes the aggregation set and its members identified by the parameters. func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error { var op errors.Op = "RDB.DeleteAggregationSet" - return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)) + keys := []string{ + base.AggregationSetKey(qname, gname, setID), + base.AllAggregationSets(qname), + } + return r.runScript(ctx, op, deleteAggregationSetCmd, keys, base.TaskKeyPrefix(qname)) } // KEYS[1] -> asynq:{}:aggregation_sets diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f9c72d3..e4f03c6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3356,6 +3356,7 @@ func TestAggregationCheck(t *testing.T) { } } +// TODO: Rewrite this test with the new pattern of using redis key-value as data. func TestDeleteAggregationSet(t *testing.T) { r := setup(t) defer r.Close() @@ -3388,11 +3389,16 @@ func TestDeleteAggregationSet(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAggregationSet(t, r.client, tc.aggregationSet, tc.qname, tc.gname, tc.setID) + key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID) + if err := r.client.ZAdd(context.Background(), + base.AllAggregationSets(tc.qname), + &redis.Z{Member: key, Score: float64(now.Add(aggregationTimeout).Unix())}).Err(); err != nil { + t.Fatal(err) + } if err := r.DeleteAggregationSet(ctx, tc.qname, tc.gname, tc.setID); err != nil { t.Fatalf("DeleteAggregationSet returned error: %v", err) } - key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID) // Check if the set is deleted. if r.client.Exists(ctx, key).Val() != 0 { t.Errorf("aggregation set key %q still exists", key) @@ -3405,6 +3411,10 @@ func TestDeleteAggregationSet(t *testing.T) { t.Errorf("task key %q still exists", taskKey) } } + + if _, err := r.client.ZScore(ctx, base.AllAggregationSets(tc.qname), key).Result(); err != redis.Nil { + t.Errorf("aggregation_set key %q is still in key %q", key, base.AllAggregationSets(tc.qname)) + } } }