diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index b4ebd75..f5947ee 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -533,19 +533,6 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { return n, nil } -// ArchiveTask finds a task that matches the id from the given queue and archives it. -// If there's no match, it returns ErrTaskNotFound. -func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { - n, err := r.archiveTask(qname, id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - // ArchiveAllRetryTasks archives all retry tasks from the given queue and // returns the number of tasks that were moved. func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) { @@ -594,37 +581,49 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { return n, nil } -// archiveTaskCmd is a Lua script that arhives a task given a task id. +// archiveTaskCmd is a Lua script that archives a task given a task id. // // Input: // KEYS[1] -> task key (asynq:{}:t:) // KEYS[2] -> archived key (asynq:{}:archived) +// KEYS[3] -> all queues key +// -- // ARGV[1] -> id of the task to archive // ARGV[2] -> current timestamp // ARGV[3] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> max number of tasks in archived state (e.g., 100) // ARGV[5] -> queue key prefix (asynq:{}:) +// ARGV[6] -> queue name // // Output: -// TODO: document return value of the script +// Numeric code indicating the status: +// Returns 1 if task is successfully archived. +// Returns 0 if task is not found. +// Returns -1 if task is already archived. +// Returns -2 if task is in active state. +// Returns -3 if queue doesn't exist. +// Returns error reply if unexpected error occurs. var archiveTaskCmd = redis.NewScript(` +if redis.call("SISMEMBER", KEYS[3], ARGV[6]) == 0 then + return -3 +end if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end local state = redis.call("HGET", KEYS[1], "state") if state == "active" then - return redis.error_reply("Cannot archive active task. Use cancel instead.") + return -2 end if state == "archived" then - return redis.error_reply("Task is already archived") + return -1 end if state == "pending" then if redis.call("LREM", ARGV[5] .. state, 1, ARGV[1]) == 0 then - return redis.error_reply("internal error: task id not found in list " .. tostring(state)) + return redis.error_reply("task id not found in list " .. tostring(state)) end else if redis.call("ZREM", ARGV[5] .. state, ARGV[1]) == 0 then - return redis.error_reply("internal error: task id not found in zset " .. tostring(state)) + return redis.error_reply("task id not found in zset " .. tostring(state)) end end redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1]) @@ -634,28 +633,50 @@ redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4]) return 1 `) -func (r *RDB) archiveTask(qname, id string) (int64, error) { +// ArchiveTask finds a task that matches the id from the given queue and archives it. +// It returns nil if it successfully archived the task. +// +// If a queue with the given name doesn't exist, it returns ErrQueueNotFound. +// If a task with the given id doesn't exist in the queue, it returns ErrTaskNotFound +// If a task is already archived, it returns ErrTaskAlreadyArchived. +// If a task is in active state it returns non-nil error. +func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { keys := []string{ - base.TaskKey(qname, id), + base.TaskKey(qname, id.String()), base.ArchivedKey(qname), + base.AllQueues, } now := time.Now() argv := []interface{}{ - id, + id.String(), now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, base.QueueKeyPrefix(qname), + qname, } res, err := archiveTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { - return 0, err + return err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("internal error: could not cast %v to int64", res) + return fmt.Errorf("%w: could not cast %v to int64", base.ErrInternal, res) + } + switch n { + case 1: + return nil + case 0: + return ErrTaskNotFound + case -1: + return ErrTaskAlreadyArchived + case -2: + return fmt.Errorf("%w: cannot archive task in active state. use CancelTask instead.", base.ErrFailedPrecondition) + case -3: + return ErrQueueNotFound + default: + return fmt.Errorf("%w: unexpected return value from archiveTaskCmd script: %d", base.ErrInternal, n) } - return n, nil } // KEYS[1] -> ZSET to move task from (e.g., asynq:{}:retry) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index f430e50..d71b3ee 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -5,6 +5,7 @@ package rdb import ( + "errors" "fmt" "testing" "time" @@ -1715,7 +1716,7 @@ func TestArchiveRetryTask(t *testing.T) { h.SeedAllArchivedQueues(t, r.client, tc.archived) got := r.ArchiveTask(tc.qname, tc.id) - if got != tc.want { + if !errors.Is(got, tc.want) { t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue @@ -1780,6 +1781,23 @@ func TestArchiveScheduledTask(t *testing.T) { "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, + { + scheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + qname: "nonexistent", + id: m2.ID, + want: ErrQueueNotFound, + wantScheduled: map[string][]base.Z{ + "default": {{Message: m1, Score: t1.Unix()}}, + }, + wantArchived: map[string][]base.Z{ + "default": {{Message: m2, Score: t2.Unix()}}, + }, + }, { scheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, @@ -1789,7 +1807,7 @@ func TestArchiveScheduledTask(t *testing.T) { }, qname: "default", id: m2.ID, - want: ErrTaskNotFound, + want: ErrTaskAlreadyArchived, wantScheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, @@ -1837,7 +1855,7 @@ func TestArchiveScheduledTask(t *testing.T) { h.SeedAllArchivedQueues(t, r.client, tc.archived) got := r.ArchiveTask(tc.qname, tc.id) - if got != tc.want { + if !errors.Is(got, tc.want) { t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue @@ -1905,7 +1923,7 @@ func TestArchivePendingTask(t *testing.T) { "default": {{Message: m2, Score: oneHourAgo.Unix()}}, }, qname: "default", - id: m2.ID, + id: uuid.New(), want: ErrTaskNotFound, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, @@ -1943,7 +1961,7 @@ func TestArchivePendingTask(t *testing.T) { h.SeedAllArchivedQueues(t, r.client, tc.archived) got := r.ArchiveTask(tc.qname, tc.id) - if got != tc.want { + if !errors.Is(got, tc.want) { t.Errorf("(*RDB).ArchiveTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 01edde6..ec2f5f2 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -20,10 +20,16 @@ var ( ErrNoProcessableTask = errors.New("no tasks are ready for processing") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. - ErrTaskNotFound = errors.New("could not find a task") + ErrTaskNotFound = fmt.Errorf("%w: could not find a task in the queue", base.ErrNotFound) + + // ErrTaskAlreadyArchived indicates that the task in question is already in archive state. + ErrTaskAlreadyArchived = fmt.Errorf("%w: task is already archived", base.ErrFailedPrecondition) // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. ErrDuplicateTask = errors.New("task already exists") + + // ErrQueueNotFound indicates that a queue with the given name does not exist. + ErrQueueNotFound = fmt.Errorf("%w: queue does not exist", base.ErrNotFound) ) const statsTTL = 90 * 24 * time.Hour // 90 days