2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Rename pending key

This commit is contained in:
Ken Hibino
2021-02-21 05:24:04 -08:00
parent 6a9d9fd717
commit fae6c4bdc8
9 changed files with 46 additions and 46 deletions

View File

@@ -120,7 +120,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
@@ -379,7 +379,7 @@ func TestClientEnqueue(t *testing.T) {
for qname, want := range tc.wantPending {
got := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
}
@@ -484,7 +484,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.IgnoreIDOpt, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {

View File

@@ -130,7 +130,7 @@ func TestForwarder(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
}

View File

@@ -196,7 +196,7 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) {
func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.QueueKey(qname), msgs)
seedRedisList(tb, r, base.PendingKey(qname), msgs)
}
// SeedActiveQueue initializes the active queue with the given messages.
@@ -299,7 +299,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b
// GetPendingMessages returns all pending messages in the given queue.
func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.QueueKey(qname))
return getListMessages(tb, r, base.PendingKey(qname))
}
// GetActiveMessages returns all active messages in the given queue.

View File

@@ -25,7 +25,7 @@ const Version = "0.16.0"
const DefaultQueueName = "default"
// DefaultQueue is the redis key for the default queue.
var DefaultQueue = QueueKey(DefaultQueueName)
var DefaultQueue = PendingKey(DefaultQueueName)
// Global Redis keys.
const (
@@ -45,9 +45,9 @@ func ValidateQueueName(qname string) error {
return nil
}
// QueueKey returns a redis key for the given queue name.
func QueueKey(qname string) string {
return fmt.Sprintf("asynq:{%s}", qname)
// PendingKey returns a redis key for the given queue name.
func PendingKey(qname string) string {
return fmt.Sprintf("asynq:{%s}:pending", qname)
}
// ActiveKey returns a redis key for the active tasks.

View File

@@ -25,7 +25,7 @@ func TestQueueKey(t *testing.T) {
}
for _, tc := range tests {
got := QueueKey(tc.qname)
got := PendingKey(tc.qname)
if got != tc.want {
t.Errorf("QueueKey(%q) = %q, want %q", tc.qname, got, tc.want)
}

View File

@@ -110,7 +110,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
@@ -135,7 +135,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
key := cast.ToString(data[i])
val := cast.ToInt(data[i+1])
switch key {
case base.QueueKey(qname):
case base.PendingKey(qname):
stats.Pending = val
size += val
case base.ActiveKey(qname):
@@ -300,7 +300,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.QueueKey(qname), pgn)
return r.listMessages(base.PendingKey(qname), pgn)
}
// ListActive returns all tasks that are currently being processed for the given queue.
@@ -386,7 +386,7 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score))
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -400,7 +400,7 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score))
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -414,7 +414,7 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score))
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -427,19 +427,19 @@ func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ScheduledKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ScheduledKey(qname), base.PendingKey(qname))
}
// RunAllRetryTasks enqueues all retry tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllRetryTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.RetryKey(qname), base.PendingKey(qname))
}
// RunAllArchivedTasks enqueues all archived tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
}
var removeAndRunCmd = redis.NewScript(`
@@ -530,7 +530,7 @@ return 1
`)
func (r *RDB) archivePending(qname, msg string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
@@ -548,7 +548,7 @@ func (r *RDB) archivePending(qname, msg string) (int64, error) {
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
qkey := base.PendingKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
@@ -602,7 +602,7 @@ return table.getn(msgs)`)
// ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks that were moved.
func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{now.Unix(), limit, maxArchiveSize}
@@ -705,7 +705,7 @@ func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
qkey := base.PendingKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
@@ -800,7 +800,7 @@ return n`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result()
res, err := deleteAllPendingCmd.Run(r.client, []string{base.PendingKey(qname)}).Result()
if err != nil {
return 0, err
}
@@ -895,7 +895,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
script = removeQueueCmd
}
keys := []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
@@ -1064,7 +1064,7 @@ func (r *RDB) Unpause(qname string) error {
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (r *RDB) ClusterKeySlot(qname string) (int64, error) {
key := base.QueueKey(qname)
key := base.PendingKey(qname)
return r.client.ClusterKeySlot(key).Result()
}

View File

@@ -1088,7 +1088,7 @@ func TestRunDeadTask(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1193,7 +1193,7 @@ func TestRunRetryTask(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1298,7 +1298,7 @@ func TestRunScheduledTask(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1405,7 +1405,7 @@ func TestRunAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
@@ -1511,7 +1511,7 @@ func TestRunAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
@@ -1617,7 +1617,7 @@ func TestRunAllDeadTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
@@ -2717,7 +2717,7 @@ func TestRemoveQueue(t *testing.T) {
}
keys := []string{
base.QueueKey(tc.qname),
base.PendingKey(tc.qname),
base.ActiveKey(tc.qname),
base.DeadlinesKey(tc.qname),
base.ScheduledKey(tc.qname),
@@ -2846,7 +2846,7 @@ func TestRemoveQueueError(t *testing.T) {
for qname, want := range tc.pending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.inProgress {

View File

@@ -59,7 +59,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
key := base.QueueKey(msg.Queue)
key := base.PendingKey(msg.Queue)
return r.client.LPush(key, encoded).Err()
}
@@ -88,7 +88,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
return err
}
res, err := enqueueUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.QueueKey(msg.Queue)},
[]string{msg.UniqueKey, base.PendingKey(msg.Queue)},
msg.ID.String(), int(ttl.Seconds()), encoded).Result()
if err != nil {
return err
@@ -154,7 +154,7 @@ return nil`)
func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) {
for _, qname := range qnames {
keys := []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.PausedKey(qname),
base.ActiveKey(qname),
base.DeadlinesKey(qname),
@@ -271,7 +271,7 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
return err
}
return requeueCmd.Run(r.client,
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)},
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)},
encoded).Err()
}
@@ -443,10 +443,10 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
//and enqueues any tasks that are ready to be processed.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil {
if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil {
return err
}
if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil {
if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil {
return err
}
}

View File

@@ -83,7 +83,7 @@ func TestEnqueue(t *testing.T) {
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
if len(gotPending) != 1 {
t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotPending))
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending))
continue
}
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
@@ -319,7 +319,7 @@ func TestDequeue(t *testing.T) {
for queue, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff)
}
}
for queue, want := range tc.wantActive {
@@ -438,7 +438,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
for queue, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff)
}
}
for queue, want := range tc.wantActive {
@@ -734,7 +734,7 @@ func TestRequeue(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantActive {
@@ -1337,7 +1337,7 @@ func TestCheckAndEnqueue(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {