diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 12614d8..aa02944 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1193,12 +1193,14 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { // Input: // KEYS[1] -> task key (asynq:{}:t:) // KEYS[2] -> archived key (asynq:{}:archived) +// KEYS[3] -> all groups key (asynq:{}:groups) // -- // ARGV[1] -> id of the task to archive // ARGV[2] -> current timestamp // ARGV[3] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> max number of tasks in archived state (e.g., 100) // ARGV[5] -> queue key prefix (asynq:{}:) +// ARGV[6] -> group key prefix (asynq:{}:g:) // // Output: // Numeric code indicating the status: @@ -1211,7 +1213,7 @@ var archiveTaskCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end -local state = redis.call("HGET", KEYS[1], "state") +local state, group = unpack(redis.call("HMGET", KEYS[1], "state", "group")) if state == "active" then return -2 end @@ -1220,11 +1222,18 @@ if state == "archived" then end if state == "pending" then if redis.call("LREM", ARGV[5] .. state, 1, ARGV[1]) == 0 then - return redis.error_reply("task id not found in list " .. tostring(state)) + return redis.error_reply("task id not found in list " .. tostring(ARGV[5] .. state)) + end +elseif state == "aggregating" then + if redis.call("ZREM", ARGV[6] .. group, ARGV[1]) == 0 then + return redis.error_reply("task id not found in zset " .. tostring(ARGV[6] .. group)) + end + if redis.call("ZCARD", ARGV[6] .. group) == 0 then + redis.call("SREM", KEYS[3], group) end else if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then - return redis.error_reply("task id not found in zset " .. tostring(state)) + return redis.error_reply("task id not found in zset " .. tostring(ARGV[5] .. state)) end end redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) @@ -1249,6 +1258,7 @@ func (r *RDB) ArchiveTask(qname, id string) error { keys := []string{ base.TaskKey(qname, id), base.ArchivedKey(qname), + base.AllGroups(qname), } now := r.clock.Now() argv := []interface{}{ @@ -1257,6 +1267,7 @@ func (r *RDB) ArchiveTask(qname, id string) error { now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, base.QueueKeyPrefix(qname), + base.GroupKeyPrefix(qname), } res, err := archiveTaskCmd.Run(context.Background(), r.client, keys, argv...).Result() if err != nil { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 1678b6d..5cb58ea 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2771,6 +2771,115 @@ func TestArchiveScheduledTask(t *testing.T) { } } +func TestArchiveAggregatingTask(t *testing.T) { + r := setup(t) + defer r.Close() + now := time.Now() + r.SetClock(timeutil.NewSimulatedClock(now)) + m1 := h.NewTaskMessageBuilder().SetQueue("default").SetType("task1").SetGroup("group1").Build() + m2 := h.NewTaskMessageBuilder().SetQueue("default").SetType("task2").SetGroup("group1").Build() + m3 := h.NewTaskMessageBuilder().SetQueue("custom").SetType("task3").SetGroup("group1").Build() + + fxt := struct { + tasks []*h.TaskSeedData + allQueues []string + allGroups map[string][]string + groups map[string][]*redis.Z + }{ + tasks: []*h.TaskSeedData{ + {Msg: m1, State: base.TaskStateAggregating}, + {Msg: m2, State: base.TaskStateAggregating}, + {Msg: m3, State: base.TaskStateAggregating}, + }, + allQueues: []string{"default", "custom"}, + allGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {"group1"}, + }, + groups: map[string][]*redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m3.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + } + + tests := []struct { + desc string + qname string + id string + wantArchived map[string][]redis.Z + wantAllGroups map[string][]string + wantGroups map[string][]redis.Z + }{ + { + desc: "archive task from a group with multiple tasks", + qname: "default", + id: m1.ID, + wantArchived: map[string][]redis.Z{ + base.ArchivedKey("default"): { + {Member: m1.ID, Score: float64(now.Unix())}, + }, + }, + wantAllGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {"group1"}, + }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): { + {Member: m3.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + }, + }, + }, + { + desc: "archive task from a group with a single task", + qname: "custom", + id: m3.ID, + wantArchived: map[string][]redis.Z{ + base.ArchivedKey("custom"): { + {Member: m3.ID, Score: float64(now.Unix())}, + }, + }, + wantAllGroups: map[string][]string{ + base.AllGroups("default"): {"group1"}, + base.AllGroups("custom"): {}, + }, + wantGroups: map[string][]redis.Z{ + base.GroupKey("default", "group1"): { + {Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())}, + {Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())}, + }, + base.GroupKey("custom", "group1"): {}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) + h.SeedTasks(t, r.client, fxt.tasks) + h.SeedRedisSet(t, r.client, base.AllQueues, fxt.allQueues) + h.SeedRedisSets(t, r.client, fxt.allGroups) + h.SeedRedisZSets(t, r.client, fxt.groups) + + t.Run(tc.desc, func(t *testing.T) { + err := r.ArchiveTask(tc.qname, tc.id) + if err != nil { + t.Fatalf("ArchiveTask returned error: %v", err) + } + + h.AssertRedisZSets(t, r.client, tc.wantArchived) + h.AssertRedisZSets(t, r.client, tc.wantGroups) + h.AssertRedisSets(t, r.client, tc.wantAllGroups) + }) + } +} + func TestArchivePendingTask(t *testing.T) { r := setup(t) defer r.Close()