From 7849b1114c385849ef0028d7009084a3d6adfa58 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 10 Mar 2022 11:00:28 -0800 Subject: [PATCH] Implement RDB.DeleteAggregationSet --- internal/asynqtest/asynqtest.go | 41 +++++++++++++++++++++++++--- internal/rdb/rdb.go | 17 ++++++++++-- internal/rdb/rdb_test.go | 47 +++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 6 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index f36a37c..6c7a35c 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -252,6 +252,12 @@ func SeedGroup(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname, seedRedisZSet(tb, r, base.GroupKey(qname, gname), entries, base.TaskStateAggregating) } +func SeedAggregationSet(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname, gname, setID string) { + tb.Helper() + r.SAdd(context.Background(), base.AllQueues, qname) + seedRedisSet(tb, r, base.AggregationSetKey(qname, gname, setID), msgs, base.TaskStateAggregating) +} + // SeedAllPendingQueues initializes all of the specified queues with the given messages. // // pending maps a queue name to a list of messages. @@ -330,14 +336,14 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, if err := c.LPush(context.Background(), key, msg.ID).Err(); err != nil { tb.Fatal(err) } - key := base.TaskKey(msg.Queue, msg.ID) + taskKey := base.TaskKey(msg.Queue, msg.ID) data := map[string]interface{}{ "msg": encoded, "state": state.String(), "unique_key": msg.UniqueKey, "group": msg.GroupKey, } - if err := c.HSet(context.Background(), key, data).Err(); err != nil { + if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil { tb.Fatal(err) } if len(msg.UniqueKey) > 0 { @@ -359,14 +365,41 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, if err := c.ZAdd(context.Background(), key, z).Err(); err != nil { tb.Fatal(err) } - key := base.TaskKey(msg.Queue, msg.ID) + taskKey := base.TaskKey(msg.Queue, msg.ID) data := map[string]interface{}{ "msg": encoded, "state": state.String(), "unique_key": msg.UniqueKey, "group": msg.GroupKey, } - if err := c.HSet(context.Background(), key, data).Err(); err != nil { + if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil { + tb.Fatal(err) + } + if len(msg.UniqueKey) > 0 { + err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID, 1*time.Minute).Err() + if err != nil { + tb.Fatalf("Failed to set unique lock in redis: %v", err) + } + } + } +} + +func seedRedisSet(tb testing.TB, c redis.UniversalClient, key string, + msgs []*base.TaskMessage, state base.TaskState) { + tb.Helper() + for _, msg := range msgs { + encoded := MustMarshal(tb, msg) + if err := c.SAdd(context.Background(), key, msg.ID).Err(); err != nil { + tb.Fatal(err) + } + taskKey := base.TaskKey(msg.Queue, msg.ID) + data := map[string]interface{}{ + "msg": encoded, + "state": state.String(), + "unique_key": msg.UniqueKey, + "group": msg.GroupKey, + } + if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil { tb.Fatal(err) } if len(msg.UniqueKey) > 0 { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 0ba9690..cb282c5 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1124,9 +1124,22 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag return msgs, time.Unix(int64(deadlineUnix), 0), nil } -// DeleteAggregationSet deletes the aggregation set identified by the parameters. +// KEYS[1] -> asynq:{}:g:: +// ------- +// ARGV[1] -> task key prefix +var deleteAggregationSetCmd = redis.NewScript(` +local ids = redis.call("SMEMBERS", KEYS[1]) +for _, id in ipairs(ids) do + redis.call("DEL", ARGV[1] .. id) +end +redis.call("DEL", KEYS[1]) +return redis.status_reply("OK") +`) + +// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters. func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error { - return nil + var op errors.Op = "RDB.DeleteAggregationSet" + return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname)) } // KEYS[1] -> asynq:{}:completed diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7d973ec..903f04f 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3284,3 +3284,50 @@ func TestAggregationCheck(t *testing.T) { } } } + +func TestDeleteAggregationSet(t *testing.T) { + r := setup(t) + defer r.Close() + + ctx := context.Background() + setID := uuid.NewString() + msg1 := h.NewTaskMessageBuilder().SetType("foo").SetQueue("default").SetGroup("mygroup").Build() + msg2 := h.NewTaskMessageBuilder().SetType("bar").SetQueue("default").SetGroup("mygroup").Build() + msg3 := h.NewTaskMessageBuilder().SetType("baz").SetQueue("default").SetGroup("mygroup").Build() + + tests := []struct { + aggregationSet []*base.TaskMessage + qname string + gname string + setID string + }{ + { + aggregationSet: []*base.TaskMessage{msg1, msg2, msg3}, + qname: "default", + gname: "mygroup", + setID: setID, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedAggregationSet(t, r.client, tc.aggregationSet, tc.qname, tc.gname, tc.setID) + + if err := r.DeleteAggregationSet(ctx, tc.qname, tc.gname, tc.setID); err != nil { + t.Fatalf("DeleteAggregationSet returned error: %v", err) + } + key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID) + // Check if the set is deleted. + if r.client.Exists(ctx, key).Val() != 0 { + t.Errorf("aggregation set key %q still exists", key) + } + + // Check all tasks in the set are deleted. + for _, m := range tc.aggregationSet { + taskKey := base.TaskKey(m.Queue, m.ID) + if r.client.Exists(ctx, taskKey).Val() != 0 { + t.Errorf("task key %q still exists", taskKey) + } + } + } +}