From 9023cbf4be463ad0d3ebc1e33627b5a06ca67e92 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 27 Mar 2022 16:25:03 -0700 Subject: [PATCH] Update RDB.DeleteTask to handle aggregating task --- internal/rdb/inspect.go | 17 +++++-- internal/rdb/inspect_test.go | 95 ++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 3 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 5db9572..c96bcae 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1279,9 +1279,11 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { // Input: // KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:groups // -- // ARGV[1] -> task ID // ARGV[2] -> queue key prefix +// ARGV[3] -> group key prefix // // Output: // Numeric code indicating the status: @@ -1292,17 +1294,24 @@ var deleteTaskCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end -local state = redis.call("HGET", KEYS[1], "state") +local state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group")) if state == "active" then return -1 end if state == "pending" then if redis.call("LREM", ARGV[2] .. state, 0, ARGV[1]) == 0 then - return redis.error_reply("task is not found in list: " .. tostring(state)) + return redis.error_reply("task is not found in list: " .. tostring(ARGV[2] .. state)) + end +elseif state == "aggregating" then + if redis.call("ZREM", ARGV[3] .. group, ARGV[1]) == 0 then + return redis.error_reply("task is not found in zset: " .. tostring(ARGV[3] .. group)) + end + if redis.call("ZCARD", ARGV[3] .. group) == 0 then + redis.call("SREM", KEYS[2], group) end else if redis.call("ZREM", ARGV[2] .. state, ARGV[1]) == 0 then - return redis.error_reply("task is not found in zset: " .. tostring(state)) + return redis.error_reply("task is not found in zset: " .. tostring(ARGV[2] .. state)) end end local unique_key = redis.call("HGET", KEYS[1], "unique_key") @@ -1325,10 +1334,12 @@ func (r *RDB) DeleteTask(qname, id string) error { } keys := []string{ base.TaskKey(qname, id), + base.AllGroups(qname), } argv := []interface{}{ id, base.QueueKeyPrefix(qname), + base.GroupKeyPrefix(qname), } res, err := deleteTaskCmd.Run(context.Background(), r.client, keys, argv...).Result() if err != nil { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b2039cd..68c3073 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -3671,6 +3671,101 @@ func TestDeleteScheduledTask(t *testing.T) { } } +func TestDeleteAggregatingTask(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + m1 := h.NewTaskMessageBuilder().SetQueue("default").SetType("task1").SetGroup("group1").Build() + m2 := h.NewTaskMessageBuilder().SetQueue("default").SetType("task2").SetGroup("group1").Build() + m3 := h.NewTaskMessageBuilder().SetQueue("custom").SetType("task3").SetGroup("group1").Build() + + fxt := struct { + tasks []*h.TaskSeedData + allQueues []string + allGroups map[string][]string + groups map[string][]*redis.Z + }{ + tasks: []*h.TaskSeedData{ + {Msg: m1, State: base.TaskStateAggregating}, + {Msg: m2, State: base.TaskStateAggregating}, + {Msg: m3, State: base.TaskStateAggregating}, + }, + allQueues: []string{"default", "custom"}, + allGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {"group1"}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m3.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + } + + tests := []struct { + desc string + qname string + id string + wantAllGroups map[string][]string + wantGroups map[string][]redis.Z + }{ + { + desc: "deletes a task from group with multiple tasks", + qname: "default", + id: m1.ID, + wantAllGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {"group1"}, + }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m3.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + }, + { + desc: "deletes a task from group with single task", + qname: "custom", + id: m3.ID, + wantAllGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {}, // should be clear out group from all-groups set + }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedTasks(t, r.client, fxt.tasks) + h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues) + h.SeedRedisSets(t, r.client, fxt.allGroups) + h.SeedRedisZSets(t, r.client, fxt.groups) + + t.Run(tc.desc, func(t *testing.T) { + err := r.DeleteTask(tc.qname, tc.id) + if err != nil { + t.Fatalf("DeleteTask returned error: %v", err) + } + h.AssertRedisSets(t, r.client, tc.wantAllGroups) + h.AssertRedisZSets(t, r.client, tc.wantGroups) + }) + } +} + func TestDeletePendingTask(t *testing.T) { r := setup(t) defer r.Close()