diff --git a/inspeq/inspector.go b/inspeq/inspector.go index d637d22..11c4290 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -135,6 +135,7 @@ func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { } // ErrQueueNotFound indicates that the specified queue does not exist. +// TODO: Consider renaming type ErrQueueNotFound struct { qname string } @@ -144,6 +145,7 @@ func (e *ErrQueueNotFound) Error() string { } // ErrQueueNotEmpty indicates that the specified queue is not empty. +// TODO: Consider renaming type ErrQueueNotEmpty struct { qname string } @@ -595,28 +597,13 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { return int(n), err } -// DeleteTaskByKey deletes a task with the given key from the given queue. -// TODO: We don't need score any more. Update this to delete task by ID -func (i *Inspector) DeleteTaskByKey(qname, key string) error { +// DeleteTaskByKey deletes a task with the given id from the given queue. +func (i *Inspector) DeleteTask(qname, id string) error { if err := base.ValidateQueueName(qname); err != nil { return err } - prefix, id, _, err := parseTaskKey(key) - if err != nil { - return err - } - switch prefix { - case keyPrefixPending: - return i.rdb.DeletePendingTask(qname, id) - case keyPrefixScheduled: - return i.rdb.DeleteScheduledTask(qname, id) - case keyPrefixRetry: - return i.rdb.DeleteRetryTask(qname, id) - case keyPrefixArchived: - return i.rdb.DeleteArchivedTask(qname, id) - default: - return fmt.Errorf("invalid key") - } + // TODO: Return ErrTaskNotFound or meaningful error + return i.rdb.DeleteTask(qname, id) } // RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 0fedc9f..a427b78 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -1912,7 +1912,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { tests := []struct { pending map[string][]*base.TaskMessage qname string - key string + id string wantPending map[string][]*base.TaskMessage }{ { @@ -1921,7 +1921,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "default", - key: createPendingTask(m2).Key(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -1933,7 +1933,7 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "custom", - key: createPendingTask(m3).Key(), + id: createPendingTask(m3).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, @@ -1945,9 +1945,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", - tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } @@ -1978,7 +1977,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - key string + id string wantScheduled map[string][]base.Z }{ { @@ -1987,7 +1986,7 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createScheduledTask(z2).Key(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -1999,8 +1998,8 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllScheduledQueues(t, r, tc.scheduled) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) } for qname, want := range tc.wantScheduled { gotScheduled := h.GetScheduledEntries(t, r, qname) @@ -2028,7 +2027,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - key string + id string wantRetry map[string][]base.Z }{ { @@ -2037,7 +2036,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createRetryTask(z2).Key(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2049,8 +2048,8 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllRetryQueues(t, r, tc.retry) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantRetry { @@ -2078,7 +2077,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - key string + id string wantArchived map[string][]base.Z }{ { @@ -2087,7 +2086,7 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { "custom": {z3}, }, qname: "default", - key: createArchivedTask(z2).Key(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2099,8 +2098,8 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { h.FlushDB(t, r) h.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.DeleteTask(tc.qname, tc.id); err != nil { + t.Errorf("DeleteTask(%q, %q) returned error: %v", tc.qname, tc.id, err) continue } for qname, want := range tc.wantArchived { diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 66831ba..238e8cc 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -9,6 +9,7 @@ import ( "encoding/json" "math" "sort" + "strings" "testing" "github.com/go-redis/redis/v7" @@ -18,6 +19,28 @@ import ( "github.com/hibiken/asynq/internal/base" ) +type taskState int + +const ( + stateActive taskState = iota + statePending + stateScheduled + stateRetry + stateArchived +) + +var taskStateNames = map[taskState]string{ + stateActive: "active", + statePending: "pending", + stateScheduled: "scheduled", + stateRetry: "retry", + stateArchived: "archived", +} + +func (s taskState) String() string { + return taskStateNames[s] +} + // EquateInt64Approx returns a Comparer option that treats int64 values // to be equal if they are within the given margin. func EquateInt64Approx(margin int64) cmp.Option { @@ -182,42 +205,42 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) { func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.PendingKey(qname), msgs) + seedRedisList(tb, r, qname, msgs, statePending) } // SeedActiveQueue initializes the active queue with the given messages. func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.ActiveKey(qname), msgs) + seedRedisList(tb, r, qname, msgs, stateActive) } // SeedScheduledQueue initializes the scheduled queue with the given messages. func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.ScheduledKey(qname), entries) + seedRedisZSet(tb, r, qname, entries, stateScheduled) } // SeedRetryQueue initializes the retry queue with the given messages. func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.RetryKey(qname), entries) + seedRedisZSet(tb, r, qname, entries, stateRetry) } // SeedArchivedQueue initializes the archived queue with the given messages. func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.ArchivedKey(qname), entries) + seedRedisZSet(tb, r, qname, entries, stateArchived) } // SeedDeadlines initializes the deadlines set with the given entries. func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries) + seedRedisZSet(tb, r, qname, entries, stateActive) } // SeedAllPendingQueues initializes all of the specified queues with the given messages. @@ -270,8 +293,17 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } } -func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) { +func seedRedisList(tb testing.TB, c redis.UniversalClient, qname string, msgs []*base.TaskMessage, state taskState) { tb.Helper() + var key string + switch state { + case statePending: + key = base.PendingKey(qname) + case stateActive: + key = base.ActiveKey(qname) + default: + tb.Fatalf("cannot seed redis LIST with task state %s", state) + } for _, msg := range msgs { encoded := MustMarshal(tb, msg) if err := c.LPush(key, msg.ID.String()).Err(); err != nil { @@ -282,6 +314,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b "msg": encoded, "timeout": msg.Timeout, "deadline": msg.Deadline, + "state": strings.ToUpper(state.String()), } if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) @@ -289,8 +322,19 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b } } -func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) { +func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items []base.Z, state taskState) { tb.Helper() + var key string + switch state { + case stateScheduled: + key = base.ScheduledKey(qname) + case stateRetry: + key = base.RetryKey(qname) + case stateArchived: + key = base.ArchivedKey(qname) + default: + tb.Fatalf("cannot seed redis ZSET with task state %s", state) + } for _, item := range items { msg := item.Message encoded := MustMarshal(tb, msg) @@ -303,6 +347,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b "msg": encoded, "timeout": msg.Timeout, "deadline": msg.Deadline, + "state": strings.ToUpper(state.String()), } if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) diff --git a/internal/base/base.go b/internal/base/base.go index 918dae3..e062ad3 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -46,9 +46,14 @@ func ValidateQueueName(qname string) error { return nil } +// QueueKeyPrefix returns a prefix for a redis-key namespaced by queue name. +func QueueKeyPrefix(qname string) string { + return fmt.Sprintf("asynq:{%s}:", qname) +} + // TaskKeyPrefix returns a prefix for task key. func TaskKeyPrefix(qname string) string { - return fmt.Sprintf("asynq:{%s}:t:", qname) + return fmt.Sprintf("%st:", QueueKeyPrefix(qname)) } // TaskKey returns a redis key for the given task message. @@ -58,47 +63,47 @@ func TaskKey(qname, id string) string { // PendingKey returns a redis key for the given queue name. func PendingKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:pending", qname) + return fmt.Sprintf("%spending", QueueKeyPrefix(qname)) } // ActiveKey returns a redis key for the active tasks. func ActiveKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:active", qname) + return fmt.Sprintf("%sactive", QueueKeyPrefix(qname)) } // ScheduledKey returns a redis key for the scheduled tasks. func ScheduledKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:scheduled", qname) + return fmt.Sprintf("%sscheduled", QueueKeyPrefix(qname)) } // RetryKey returns a redis key for the retry tasks. func RetryKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:retry", qname) + return fmt.Sprintf("%sretry", QueueKeyPrefix(qname)) } // ArchivedKey returns a redis key for the archived tasks. func ArchivedKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:archived", qname) + return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname)) } // DeadlinesKey returns a redis key for the deadlines. func DeadlinesKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:deadlines", qname) + return fmt.Sprintf("%sdeadlines", QueueKeyPrefix(qname)) } // PausedKey returns a redis key to indicate that the given queue is paused. func PausedKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:paused", qname) + return fmt.Sprintf("%spaused", QueueKeyPrefix(qname)) } // ProcessedKey returns a redis key for processed count for the given day for the queue. func ProcessedKey(qname string, t time.Time) string { - return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02")) + return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) } // FailedKey returns a redis key for failure count for the given day for the queue. func FailedKey(qname string, t time.Time) string { - return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02")) + return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) } // ServerInfoKey returns a redis key for process info. @@ -123,7 +128,7 @@ func SchedulerHistoryKey(entryID string) string { // UniqueKey returns a redis key with the given type, payload, and queue name. func UniqueKey(qname, tasktype string, payload []byte) string { - return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload)) + return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, string(payload)) } // TaskMessage is the internal representation of a task with additional metadata fields. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 53d800b..2dda829 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -738,65 +738,37 @@ func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) { return n, nil } -// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error { - return r.deleteTask(base.ArchivedKey(qname), qname, id.String()) -} - -// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error { - return r.deleteTask(base.RetryKey(qname), qname, id.String()) -} - -// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error { - return r.deleteTask(base.ScheduledKey(qname), qname, id.String()) -} - -// KEYS[1] -> asynq:{}:pending -// KEYS[2] -> asynq:{}:t: -// ARGV[1] -> task ID -var deletePendingTaskCmd = redis.NewScript(` -if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then - return 0 -end -return redis.call("DEL", KEYS[2]) -`) - -// DeletePendingTask deletes a pending tasks that matches the given id from the given queue. -// If there's no match, it returns ErrTaskNotFound. -func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { - keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())} - res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result() - if err != nil { - return err - } - n, ok := res.(int64) - if !ok { - return fmt.Errorf("command error: unexpected return value %v", res) - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - -// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{}:retry) -// KEYS[2] -> asynq:{}:t: +// KEYS[1] -> asynq:{}:t: // ARGV[1] -> task ID +// ARGV[2] -> redis key prefix (asynq:{}:) var deleteTaskCmd = redis.NewScript(` -if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then +if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end -return redis.call("DEL", KEYS[2]) +local state = redis.call("HGET", KEYS[1], "state") +local n +if state == "PENDING" then + n = redis.call("LREM", (ARGV[2] .. "pending"), 0, ARGV[1]) +elseif state == "SCHEDULED" then + n = redis.call("ZREM", (ARGV[2] .. "scheduled"), ARGV[1]) +elseif state == "RETRY" then + n = redis.call("ZREM", (ARGV[2] .. "retry"), ARGV[1]) +elseif state == "ARCHIVED" then + n = redis.call("ZREM", (ARGV[2] .. "archived"), ARGV[1]) +else + return redis.error_reply("unknown task state: " .. tostring(state)) +end +if n == 0 then + return 0 +end +return redis.call("DEL", KEYS[1]) `) -func (r *RDB) deleteTask(key, qname, id string) error { - keys := []string{key, base.TaskKey(qname, id)} - argv := []interface{}{id} +// DeleteTask deletes a task that matches the given id from the given queue. +// If a task that matches the id does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeleteTask(qname, id string) error { + keys := []string{base.TaskKey(qname, id)} + argv := []interface{}{id, base.QueueKeyPrefix(qname)} res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { return err diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index b9711fd..ce326e3 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2472,9 +2472,9 @@ func TestDeleteArchivedTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.DeleteArchivedTask(tc.qname, tc.id) + got := r.DeleteTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.DeleteArchivedTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2553,9 +2553,9 @@ func TestDeleteRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) - got := r.DeleteRetryTask(tc.qname, tc.id) + got := r.DeleteTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.DeleteRetryTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2634,9 +2634,9 @@ func TestDeleteScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.DeleteScheduledTask(tc.qname, tc.id) + got := r.DeleteTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.DeleteScheduledTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -2704,9 +2704,9 @@ func TestDeletePendingTask(t *testing.T) { h.FlushDB(t, r.client) h.SeedAllPendingQueues(t, r.client, tc.pending) - got := r.DeletePendingTask(tc.qname, tc.id) + got := r.DeleteTask(tc.qname, tc.id.String()) if got != tc.want { - t.Errorf("r.DeletePendingTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.DeleteTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want) continue }