diff --git a/inspector.go b/inspector.go index be1675a..1f9f7bc 100644 --- a/inspector.go +++ b/inspector.go @@ -217,17 +217,24 @@ type ArchivedTask struct { score int64 } -// Key returns a key used to delete, run, and archive the task. +// Key returns a key used to delete, and archive the pending task. +func (t *PendingTask) Key() string { + // Note: Pending tasks are stored in redis LIST, therefore no score. + // Use zero for the score to preserve the same key format. + return fmt.Sprintf("p:%v:%v", t.ID, 0) +} + +// Key returns a key used to delete, run, and archive the scheduled task. func (t *ScheduledTask) Key() string { return fmt.Sprintf("s:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, run, and archive the task. +// Key returns a key used to delete, run, and archive the retry task. func (t *RetryTask) Key() string { return fmt.Sprintf("r:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, run, and archive the task. +// Key returns a key used to delete and run the archived task. func (t *ArchivedTask) Key() string { return fmt.Sprintf("a:%v:%v", t.ID, t.score) } @@ -248,7 +255,7 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err erro return uuid.Nil, 0, "", fmt.Errorf("invalid id") } state = parts[0] - if len(state) != 1 || !strings.Contains("sra", state) { + if len(state) != 1 || !strings.Contains("psra", state) { return uuid.Nil, 0, "", fmt.Errorf("invalid id") } return id, score, state, nil @@ -497,6 +504,8 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { return err } switch state { + case "p": + return i.rdb.DeletePendingTask(qname, id) case "s": return i.rdb.DeleteScheduledTask(qname, id, score) case "r": @@ -589,6 +598,8 @@ func (i *Inspector) ArchiveTaskByKey(qname, key string) error { return err } switch state { + case "p": + return i.rdb.ArchivePendingTask(qname, id) case "s": return i.rdb.ArchiveScheduledTask(qname, id, score) case "r": diff --git a/inspector_test.go b/inspector_test.go index 19c117a..910c7ee 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -1037,7 +1037,7 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { } } -func TestInspectorKillAllScheduledTasks(t *testing.T) { +func TestInspectorArchiveAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1145,11 +1145,11 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { got, err := inspector.ArchiveAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("KillAllScheduledTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("ArchiveAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("KillAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("ArchiveAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantScheduled { gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) @@ -1170,7 +1170,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { } } -func TestInspectorKillAllRetryTasks(t *testing.T) { +func TestInspectorArchiveAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1262,11 +1262,11 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { got, err := inspector.ArchiveAllRetryTasks(tc.qname) if err != nil { - t.Errorf("KillAllRetryTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("ArchiveAllRetryTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("KillAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("ArchiveAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantRetry { gotRetry := asynqtest.GetRetryEntries(t, r, qname) @@ -1632,6 +1632,67 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { } } +func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := asynqtest.NewTaskMessage("task1", nil) + m2 := asynqtest.NewTaskMessage("task2", nil) + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + inspector := NewInspector(getRedisConnOpt(t)) + + tests := []struct { + pending map[string][]*base.TaskMessage + qname string + key string + wantPending map[string][]*base.TaskMessage + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + qname: "default", + key: createPendingTask(m2).Key(), + wantPending: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {m3}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3}, + }, + qname: "custom", + key: createPendingTask(m3).Key(), + wantPending: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {}, + }, + }, + } + + for _, tc := range tests { + asynqtest.FlushDB(t, r) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) + + if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", + tc.qname, tc.key, err) + continue + } + + for qname, want := range tc.wantPending { + got := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, got, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unspected pending tasks in queue %q: (-want,+got):\n%s", + qname, diff) + continue + } + } + } +} + func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() @@ -1994,7 +2055,98 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { } } -func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { +func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { + r := setup(t) + defer r.Close() + m1 := asynqtest.NewTaskMessage("task1", nil) + m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + inspector := NewInspector(getRedisConnOpt(t)) + now := time.Now() + + tests := []struct { + pending map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + key string + wantPending map[string][]*base.TaskMessage + wantArchived map[string][]base.Z + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {m2, m3}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "default", + key: createPendingTask(m1).Key(), + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m2, m3}, + }, + wantArchived: map[string][]base.Z{ + "default": { + {Message: m1, Score: now.Unix()}, + }, + "custom": {}, + }, + }, + { + pending: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {m2, m3}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + qname: "custom", + key: createPendingTask(m2).Key(), + wantPending: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {m3}, + }, + wantArchived: map[string][]base.Z{ + "default": {}, + "custom": { + {Message: m2, Score: now.Unix()}, + }, + }, + }, + } + + for _, tc := range tests { + asynqtest.FlushDB(t, r) + asynqtest.SeedAllPendingQueues(t, r, tc.pending) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + + if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", + tc.qname, tc.key, err) + continue + } + for qname, want := range tc.wantPending { + gotPending := asynqtest.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + t.Errorf("unexpected pending tasks in queue %q: (-want,+got)\n%s", + qname, diff) + } + + } + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want,+got)\n%s", + qname, diff) + } + } + } +} + +func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -2049,7 +2201,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { asynqtest.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantScheduled { @@ -2070,7 +2222,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { } } -func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { +func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -2124,7 +2276,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { asynqtest.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantRetry { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a3298d1..82d3577 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -488,6 +488,66 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) erro return nil } +// KEYS[1] -> asynq:{} +// KEYS[2] -> asynq:{}:archived +// ARGV[1] -> task message to archive +// ARGV[2] -> current timestamp +// ARGV[3] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[4] -> max number of tasks in archive (e.g., 100) +var archivePendingCmd = redis.NewScript(` +local x = redis.call("LREM", KEYS[1], 1, ARGV[1]) +if x == 0 then + return 0 +end +redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) +redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3]) +redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4]) +return 1 +`) + +func (r *RDB) archivePending(qname, msg string) (int64, error) { + keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)} + now := time.Now() + limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago + args := []interface{}{msg, now.Unix(), limit, maxArchiveSize} + res, err := archivePendingCmd.Run(r.client, keys, args...).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil +} + +// ArchivePendingTask finds a pending task that matches the given id from the given queue +// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound. +func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error { + qkey := base.QueueKey(qname) + data, err := r.client.LRange(qkey, 0, -1).Result() + if err != nil { + return err + } + for _, s := range data { + msg, err := base.DecodeMessage(s) + if err != nil { + return err + } + if msg.ID == id { + n, err := r.archivePending(qname, s) + if err != nil { + return err + } + if n == 0 { + return ErrTaskNotFound + } + return nil + } + } + return ErrTaskNotFound +} + // ArchiveAllRetryTasks archives all retry tasks from the given queue and // returns the number of tasks that were moved. func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) { @@ -585,6 +645,29 @@ func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score)) } +// DeletePendingTask deletes a pending tasks that matches the given id from the given queue. +// If a task that matches the id does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { + qkey := base.QueueKey(qname) + data, err := r.client.LRange(qkey, 0, -1).Result() + if err != nil { + return err + } + for _, s := range data { + msg, err := base.DecodeMessage(s) + if err != nil { + return err + } + if msg.ID == id { + if err := r.client.LRem(qkey, 1, s).Err(); err != nil { + return err + } + return nil + } + } + return ErrTaskNotFound +} + var deleteTaskCmd = redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) for _, msg in ipairs(msgs) do