diff --git a/client_test.go b/client_test.go index 698efa6..c3207ef 100644 --- a/client_test.go +++ b/client_test.go @@ -120,7 +120,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r, qname) if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.wantScheduled { @@ -379,7 +379,7 @@ func TestClientEnqueue(t *testing.T) { for qname, want := range tc.wantPending { got := h.GetPendingMessages(t, r, qname) if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff) } } } @@ -484,7 +484,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r, qname) if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.wantScheduled { diff --git a/forwarder_test.go b/forwarder_test.go index 27728b3..a197f58 100644 --- a/forwarder_test.go +++ b/forwarder_test.go @@ -130,7 +130,7 @@ func TestForwarder(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.PendingKey(qname), diff) } } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index c1ef6f1..5c3e3d1 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -196,7 +196,7 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) { func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.QueueKey(qname), msgs) + seedRedisList(tb, r, base.PendingKey(qname), msgs) } // SeedActiveQueue initializes the active queue with the given messages. @@ -299,7 +299,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b // GetPendingMessages returns all pending messages in the given queue. func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.QueueKey(qname)) + return getListMessages(tb, r, base.PendingKey(qname)) } // GetActiveMessages returns all active messages in the given queue. diff --git a/internal/base/base.go b/internal/base/base.go index 0f900d8..7ca5c23 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -25,7 +25,7 @@ const Version = "0.16.0" const DefaultQueueName = "default" // DefaultQueue is the redis key for the default queue. -var DefaultQueue = QueueKey(DefaultQueueName) +var DefaultQueue = PendingKey(DefaultQueueName) // Global Redis keys. const ( @@ -45,9 +45,9 @@ func ValidateQueueName(qname string) error { return nil } -// QueueKey returns a redis key for the given queue name. -func QueueKey(qname string) string { - return fmt.Sprintf("asynq:{%s}", qname) +// PendingKey returns a redis key for the given queue name. +func PendingKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:pending", qname) } // ActiveKey returns a redis key for the active tasks. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index a75baea..71642f8 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -25,7 +25,7 @@ func TestQueueKey(t *testing.T) { } for _, tc := range tests { - got := QueueKey(tc.qname) + got := PendingKey(tc.qname) if got != tc.want { t.Errorf("QueueKey(%q) = %q, want %q", tc.qname, got, tc.want) } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 7f1d9ce..9c1cc50 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -110,7 +110,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { } now := time.Now() res, err := currentStatsCmd.Run(r.client, []string{ - base.QueueKey(qname), + base.PendingKey(qname), base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), @@ -135,7 +135,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { key := cast.ToString(data[i]) val := cast.ToInt(data[i+1]) switch key { - case base.QueueKey(qname): + case base.PendingKey(qname): stats.Pending = val size += val case base.ActiveKey(qname): @@ -300,7 +300,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listMessages(base.QueueKey(qname), pgn) + return r.listMessages(base.PendingKey(qname), pgn) } // ListActive returns all tasks that are currently being processed for the given queue. @@ -386,7 +386,7 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) { // the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score)) + n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -400,7 +400,7 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error { // the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score)) + n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -414,7 +414,7 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error { // from the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score)) + n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -427,19 +427,19 @@ func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error { // RunAllScheduledTasks enqueues all scheduled tasks from the given queue // and returns the number of tasks enqueued. func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.ScheduledKey(qname), base.QueueKey(qname)) + return r.removeAndRunAll(base.ScheduledKey(qname), base.PendingKey(qname)) } // RunAllRetryTasks enqueues all retry tasks from the given queue // and returns the number of tasks enqueued. func (r *RDB) RunAllRetryTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname)) + return r.removeAndRunAll(base.RetryKey(qname), base.PendingKey(qname)) } // RunAllArchivedTasks enqueues all archived tasks from the given queue // and returns the number of tasks enqueued. func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname)) + return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname)) } var removeAndRunCmd = redis.NewScript(` @@ -530,7 +530,7 @@ return 1 `) func (r *RDB) archivePending(qname, msg string) (int64, error) { - keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)} + keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} now := time.Now() limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago args := []interface{}{msg, now.Unix(), limit, maxArchiveSize} @@ -548,7 +548,7 @@ func (r *RDB) archivePending(qname, msg string) (int64, error) { // ArchivePendingTask finds a pending task that matches the given id from the given queue // and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound. func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error { - qkey := base.QueueKey(qname) + qkey := base.PendingKey(qname) data, err := r.client.LRange(qkey, 0, -1).Result() if err != nil { return err @@ -602,7 +602,7 @@ return table.getn(msgs)`) // ArchiveAllPendingTasks archives all pending tasks from the given queue and // returns the number of tasks that were moved. func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { - keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)} + keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)} now := time.Now() limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago args := []interface{}{now.Unix(), limit, maxArchiveSize} @@ -705,7 +705,7 @@ func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error // DeletePendingTask deletes a pending tasks that matches the given id from the given queue. // If a task that matches the id does not exist, it returns ErrTaskNotFound. func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { - qkey := base.QueueKey(qname) + qkey := base.PendingKey(qname) data, err := r.client.LRange(qkey, 0, -1).Result() if err != nil { return err @@ -800,7 +800,7 @@ return n`) // DeleteAllPendingTasks deletes all pending tasks from the given queue // and returns the number of tasks deleted. func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) { - res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result() + res, err := deleteAllPendingCmd.Run(r.client, []string{base.PendingKey(qname)}).Result() if err != nil { return 0, err } @@ -895,7 +895,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { script = removeQueueCmd } keys := []string{ - base.QueueKey(qname), + base.PendingKey(qname), base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), @@ -1064,7 +1064,7 @@ func (r *RDB) Unpause(qname string) error { // ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to. func (r *RDB) ClusterKeySlot(qname string) (int64, error) { - key := base.QueueKey(qname) + key := base.PendingKey(qname) return r.client.ClusterKeySlot(key).Result() } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0e278ef..00355ae 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1088,7 +1088,7 @@ func TestRunDeadTask(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } } @@ -1193,7 +1193,7 @@ func TestRunRetryTask(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } } @@ -1298,7 +1298,7 @@ func TestRunScheduledTask(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } } @@ -1405,7 +1405,7 @@ func TestRunAllScheduledTasks(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.wantScheduled { @@ -1511,7 +1511,7 @@ func TestRunAllRetryTasks(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.wantRetry { @@ -1617,7 +1617,7 @@ func TestRunAllDeadTasks(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.wantArchived { @@ -2717,7 +2717,7 @@ func TestRemoveQueue(t *testing.T) { } keys := []string{ - base.QueueKey(tc.qname), + base.PendingKey(tc.qname), base.ActiveKey(tc.qname), base.DeadlinesKey(tc.qname), base.ScheduledKey(tc.qname), @@ -2846,7 +2846,7 @@ func TestRemoveQueueError(t *testing.T) { for qname, want := range tc.pending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff) + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.PendingKey(qname), diff) } } for qname, want := range tc.inProgress { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c557a71..650fa13 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -59,7 +59,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - key := base.QueueKey(msg.Queue) + key := base.PendingKey(msg.Queue) return r.client.LPush(key, encoded).Err() } @@ -88,7 +88,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { return err } res, err := enqueueUniqueCmd.Run(r.client, - []string{msg.UniqueKey, base.QueueKey(msg.Queue)}, + []string{msg.UniqueKey, base.PendingKey(msg.Queue)}, msg.ID.String(), int(ttl.Seconds()), encoded).Result() if err != nil { return err @@ -154,7 +154,7 @@ return nil`) func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) { for _, qname := range qnames { keys := []string{ - base.QueueKey(qname), + base.PendingKey(qname), base.PausedKey(qname), base.ActiveKey(qname), base.DeadlinesKey(qname), @@ -271,7 +271,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { return err } return requeueCmd.Run(r.client, - []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)}, + []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)}, encoded).Err() } @@ -443,10 +443,10 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { //and enqueues any tasks that are ready to be processed. func (r *RDB) CheckAndEnqueue(qnames ...string) error { for _, qname := range qnames { - if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil { + if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil { return err } - if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil { + if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil { return err } } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index af06409..1e8a396 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -83,7 +83,7 @@ func TestEnqueue(t *testing.T) { gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue) if len(gotPending) != 1 { - t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotPending)) + t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending)) continue } if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { @@ -319,7 +319,7 @@ func TestDequeue(t *testing.T) { for queue, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, queue) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff) } } for queue, want := range tc.wantActive { @@ -438,7 +438,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { for queue, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, queue) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff) } } for queue, want := range tc.wantActive { @@ -734,7 +734,7 @@ func TestRequeue(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } } for qname, want := range tc.wantActive { @@ -1337,7 +1337,7 @@ func TestCheckAndEnqueue(t *testing.T) { for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r.client, qname) if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff) } } for qname, want := range tc.wantScheduled {