From 437fb03bb38cca514cb194c4f333580ce9aab804 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 4 Dec 2019 17:23:11 -0800 Subject: [PATCH] Unexport redis key name constants from rdb package --- asynq_test.go | 16 +++++++++++ client_test.go | 8 +++--- internal/rdb/rdb.go | 60 ++++++++++++++++++++-------------------- internal/rdb/rdb_test.go | 54 ++++++++++++++++++------------------ poller_test.go | 12 ++++---- processor_test.go | 16 +++++------ retry_test.go | 8 +++--- 7 files changed, 95 insertions(+), 79 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index d4addc5..2af5471 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -2,8 +2,10 @@ package asynq import ( "encoding/json" + "math/rand" "sort" "testing" + "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" @@ -14,6 +16,20 @@ import ( // This file defines test helper functions used by // other test files. +func init() { + rand.Seed(time.Now().UnixNano()) +} + +// Redis keys +const ( + queuePrefix = "asynq:queues:" // LIST - asynq:queues: + defaultQ = queuePrefix + "default" // LIST + scheduledQ = "asynq:scheduled" // ZSET + retryQ = "asynq:retry" // ZSET + deadQ = "asynq:dead" // ZSET + inProgressQ = "asynq:in_progress" // LIST +) + func setup(t *testing.T) *redis.Client { t.Helper() r := redis.NewClient(&redis.Options{ diff --git a/client_test.go b/client_test.go index 7474b5a..f31e10d 100644 --- a/client_test.go +++ b/client_test.go @@ -42,12 +42,12 @@ func TestClient(t *testing.T) { continue } - if l := r.LLen(rdb.DefaultQueue).Val(); l != tc.wantQueueSize { - t.Errorf("%q has length %d, want %d", rdb.DefaultQueue, l, tc.wantQueueSize) + if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize { + t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize) } - if l := r.ZCard(rdb.Scheduled).Val(); l != tc.wantScheduledSize { - t.Errorf("%q has length %d, want %d", rdb.Scheduled, l, tc.wantScheduledSize) + if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize { + t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize) } } } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fb99198..8d9ae67 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -14,13 +14,13 @@ import ( // Redis keys const ( - allQueues = "asynq:queues" // SET - queuePrefix = "asynq:queues:" // LIST - asynq:queues: - DefaultQueue = queuePrefix + "default" // LIST - Scheduled = "asynq:scheduled" // ZSET - Retry = "asynq:retry" // ZSET - Dead = "asynq:dead" // ZSET - InProgress = "asynq:in_progress" // SET + allQueues = "asynq:queues" // SET + queuePrefix = "asynq:queues:" // LIST - asynq:queues: + defaultQ = queuePrefix + "default" // LIST + scheduledQ = "asynq:scheduled" // ZSET + retryQ = "asynq:retry" // ZSET + deadQ = "asynq:dead" // ZSET + inProgressQ = "asynq:in_progress" // LIST ) // ErrDequeueTimeout indicates that the blocking dequeue operation timed out. @@ -143,19 +143,19 @@ func (r *RDB) Enqueue(msg *TaskMessage) error { // once a task is available, it adds the task to "in progress" list // and returns the task. func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) { - data, err := r.client.BRPopLPush(DefaultQueue, InProgress, timeout).Result() + data, err := r.client.BRPopLPush(defaultQ, inProgressQ, timeout).Result() if err == redis.Nil { return nil, ErrDequeueTimeout } if err != nil { - return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", DefaultQueue, InProgress, timeout, err) + return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", defaultQ, inProgressQ, timeout, err) } var msg TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) } - fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, DefaultQueue) + fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, defaultQ) return &msg, nil } @@ -166,21 +166,21 @@ func (r *RDB) Done(msg *TaskMessage) error { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } // NOTE: count ZERO means "remove all elements equal to val" - err = r.client.LRem(InProgress, 0, string(bytes)).Err() + err = r.client.LRem(inProgressQ, 0, string(bytes)).Err() if err != nil { - return fmt.Errorf("command `LREM %s 0 %s` failed: %v", InProgress, string(bytes), err) + return fmt.Errorf("command `LREM %s 0 %s` failed: %v", inProgressQ, string(bytes), err) } return nil } // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { - return r.schedule(Scheduled, processAt, msg) + return r.schedule(scheduledQ, processAt, msg) } // RetryLater adds the task to the backlog queue to be retried in the future. func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { - return r.schedule(Retry, processAt, msg) + return r.schedule(retryQ, processAt, msg) } // schedule adds the task to the zset to be processd at the specified time. @@ -208,10 +208,10 @@ func (r *RDB) Kill(msg *TaskMessage) error { } now := time.Now() pipe := r.client.Pipeline() - pipe.ZAdd(Dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) + pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - pipe.ZRemRangeByScore(Dead, "-inf", strconv.Itoa(int(limit))) - pipe.ZRemRangeByRank(Dead, 0, -maxDeadTask) // trim the set to 100 + pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit))) + pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100 _, err = pipe.Exec() return err } @@ -225,14 +225,14 @@ func (r *RDB) RestoreUnfinished() error { end return len `) - _, err := script.Run(r.client, []string{InProgress, DefaultQueue}).Result() + _, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() return err } // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // have to be processed. func (r *RDB) CheckAndEnqueue() error { - delayed := []string{Scheduled, Retry} + delayed := []string{scheduledQ, retryQ} for _, zset := range delayed { if err := r.forward(zset); err != nil { return err @@ -254,7 +254,7 @@ func (r *RDB) forward(from string) error { return msgs `) now := float64(time.Now().Unix()) - res, err := script.Run(r.client, []string{from, allQueues, DefaultQueue}, now).Result() + res, err := script.Run(r.client, []string{from, allQueues, defaultQ}, now).Result() fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from) return err } @@ -262,11 +262,11 @@ func (r *RDB) forward(from string) error { // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { pipe := r.client.Pipeline() - qlen := pipe.LLen(DefaultQueue) - plen := pipe.LLen(InProgress) - slen := pipe.ZCard(Scheduled) - rlen := pipe.ZCard(Retry) - dlen := pipe.ZCard(Dead) + qlen := pipe.LLen(defaultQ) + plen := pipe.LLen(inProgressQ) + slen := pipe.ZCard(scheduledQ) + rlen := pipe.ZCard(retryQ) + dlen := pipe.ZCard(deadQ) _, err := pipe.Exec() if err != nil { return nil, err @@ -283,7 +283,7 @@ func (r *RDB) CurrentStats() (*Stats, error) { // ListEnqueued returns all enqueued tasks that are ready to be processed. func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { - data, err := r.client.LRange(DefaultQueue, 0, -1).Result() + data, err := r.client.LRange(defaultQ, 0, -1).Result() if err != nil { return nil, err } @@ -305,7 +305,7 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { // ListInProgress returns all tasks that are currently being processed. func (r *RDB) ListInProgress() ([]*InProgressTask, error) { - data, err := r.client.LRange(DefaultQueue, 0, -1).Result() + data, err := r.client.LRange(defaultQ, 0, -1).Result() if err != nil { return nil, err } @@ -328,7 +328,7 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) { // ListScheduled returns all tasks that are scheduled to be processed // in the future. func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { - data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result() + data, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result() if err != nil { return nil, err } @@ -357,7 +357,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { // ListRetry returns all tasks that have failed before and willl be retried // in the future. func (r *RDB) ListRetry() ([]*RetryTask, error) { - data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result() + data, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result() if err != nil { return nil, err } @@ -388,7 +388,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { // ListDead returns all tasks that have exhausted its retry limit. func (r *RDB) ListDead() ([]*DeadTask, error) { - data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result() + data, err := r.client.ZRangeWithScores(deadQ, 0, -1).Result() if err != nil { return nil, err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index df28f0f..66a2f13 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -107,13 +107,13 @@ func TestEnqueue(t *testing.T) { t.Error(err) continue } - res := r.client.LRange(DefaultQueue, 0, -1).Val() + res := r.client.LRange(defaultQ, 0, -1).Val() if len(res) != 1 { - t.Errorf("LIST %q has length %d, want 1", DefaultQueue, len(res)) + t.Errorf("LIST %q has length %d, want 1", defaultQ, len(res)) continue } - if !r.client.SIsMember(allQueues, DefaultQueue).Val() { - t.Errorf("SISMEMBER %q %q = false, want true", allQueues, DefaultQueue) + if !r.client.SIsMember(allQueues, defaultQ).Val() { + t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQ) } if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) @@ -148,8 +148,8 @@ func TestDequeue(t *testing.T) { got, err, tc.want, tc.err) continue } - if l := r.client.LLen(InProgress).Val(); l != tc.inProgress { - t.Errorf("LIST %q has length %d, want %d", InProgress, l, tc.inProgress) + if l := r.client.LLen(inProgressQ).Val(); l != tc.inProgress { + t.Errorf("LIST %q has length %d, want %d", inProgressQ, l, tc.inProgress) } } } @@ -188,7 +188,7 @@ func TestDone(t *testing.T) { } // set up initial state for _, task := range tc.initial { - err := r.client.LPush(InProgress, mustMarshal(t, task)).Err() + err := r.client.LPush(inProgressQ, mustMarshal(t, task)).Err() if err != nil { t.Fatal(err) } @@ -201,13 +201,13 @@ func TestDone(t *testing.T) { } var got []*TaskMessage - data := r.client.LRange(InProgress, 0, -1).Val() + data := r.client.LRange(inProgressQ, 0, -1).Val() for _, s := range data { got = append(got, mustUnmarshal(t, s)) } if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", DefaultQueue, diff) + t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQ, diff) continue } } @@ -237,7 +237,7 @@ func TestKill(t *testing.T) { } // set up initial state for _, task := range tc.initial { - err := r.client.ZAdd(Dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() + err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() if err != nil { t.Fatal(err) } @@ -249,10 +249,10 @@ func TestKill(t *testing.T) { continue } - actual := r.client.ZRange(Dead, 0, -1).Val() + actual := r.client.ZRange(deadQ, 0, -1).Val() got := mustUnmarshalSlice(t, actual) if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", Dead, diff) + t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", deadQ, diff) continue } } @@ -298,11 +298,11 @@ func TestRestoreUnfinished(t *testing.T) { } // seed src list. for _, msg := range tc.beforeSrc { - r.client.LPush(InProgress, mustMarshal(t, msg)) + r.client.LPush(inProgressQ, mustMarshal(t, msg)) } // seed dst list. for _, msg := range tc.beforeDst { - r.client.LPush(DefaultQueue, mustMarshal(t, msg)) + r.client.LPush(defaultQ, mustMarshal(t, msg)) } if err := r.RestoreUnfinished(); err != nil { @@ -310,15 +310,15 @@ func TestRestoreUnfinished(t *testing.T) { continue } - src := r.client.LRange(InProgress, 0, -1).Val() + src := r.client.LRange(inProgressQ, 0, -1).Val() gotSrc := mustUnmarshalSlice(t, src) if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", InProgress, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgressQ, diff) } - dst := r.client.LRange(DefaultQueue, 0, -1).Val() + dst := r.client.LRange(defaultQ, 0, -1).Val() gotDst := mustUnmarshalSlice(t, dst) if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", DefaultQueue, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQ, diff) } } } @@ -375,11 +375,11 @@ func TestCheckAndEnqueue(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - if err := r.client.ZAdd(Scheduled, tc.initScheduled...).Err(); err != nil { + if err := r.client.ZAdd(scheduledQ, tc.initScheduled...).Err(); err != nil { t.Error(err) continue } - if err := r.client.ZAdd(Retry, tc.initRetry...).Err(); err != nil { + if err := r.client.ZAdd(retryQ, tc.initRetry...).Err(); err != nil { t.Error(err) continue } @@ -389,12 +389,12 @@ func TestCheckAndEnqueue(t *testing.T) { t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) continue } - queued := r.client.LRange(DefaultQueue, 0, -1).Val() + queued := r.client.LRange(defaultQ, 0, -1).Val() gotQueued := mustUnmarshalSlice(t, queued) if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" { - t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", DefaultQueue, len(gotQueued), len(tc.wantQueued), diff) + t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQ, len(gotQueued), len(tc.wantQueued), diff) } - scheduled := r.client.ZRangeByScore(Scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() + scheduled := r.client.ZRangeByScore(scheduledQ, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() gotScheduled := mustUnmarshalSlice(t, scheduled) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff) @@ -426,7 +426,7 @@ func TestSchedule(t *testing.T) { continue } - res, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result() + res, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result() if err != nil { t.Error(err) continue @@ -434,7 +434,7 @@ func TestSchedule(t *testing.T) { desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt) if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Scheduled) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), scheduledQ) continue } @@ -469,7 +469,7 @@ func TestRetryLater(t *testing.T) { continue } - res, err := r.client.ZRangeWithScores(Retry, 0, -1).Result() + res, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result() if err != nil { t.Error(err) continue @@ -477,7 +477,7 @@ func TestRetryLater(t *testing.T) { desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Retry) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ) continue } diff --git a/poller_test.go b/poller_test.go index b754956..56687d7 100644 --- a/poller_test.go +++ b/poller_test.go @@ -91,22 +91,22 @@ func TestPoller(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotScheduledRaw := r.ZRange(rdb.Scheduled, 0, -1).Val() + gotScheduledRaw := r.ZRange(scheduledQ, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Scheduled, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduledQ, diff) } - gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val() + gotRetryRaw := r.ZRange(retryQ, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Retry, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retryQ, diff) } - gotQueueRaw := r.LRange(rdb.DefaultQueue, 0, -1).Val() + gotQueueRaw := r.LRange(defaultQ, 0, -1).Val() gotQueue := mustUnmarshalSlice(t, gotQueueRaw) if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.DefaultQueue, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQ, diff) } } } diff --git a/processor_test.go b/processor_test.go index 68e6b93..fad80bd 100644 --- a/processor_test.go +++ b/processor_test.go @@ -85,8 +85,8 @@ func TestProcessorSuccess(t *testing.T) { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } - if l := r.LLen(rdb.InProgress).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l) + if l := r.LLen(inProgressQ).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", inProgressQ, l) } } } @@ -162,20 +162,20 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val() + gotRetryRaw := r.ZRange(retryQ, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Retry, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retryQ, diff) } - gotDeadRaw := r.ZRange(rdb.Dead, 0, -1).Val() + gotDeadRaw := r.ZRange(deadQ, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Dead, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", deadQ, diff) } - if l := r.LLen(rdb.InProgress).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l) + if l := r.LLen(inProgressQ).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", inProgressQ, l) } } } diff --git a/retry_test.go b/retry_test.go index 1d16f6f..a5f0139 100644 --- a/retry_test.go +++ b/retry_test.go @@ -56,16 +56,16 @@ func TestRetry(t *testing.T) { retryTask(rdbClient, tc.msg, tc.err) - deadQueue := r.ZRange(rdb.Dead, 0, -1).Val() + deadQueue := r.ZRange(deadQ, 0, -1).Val() gotDead := mustUnmarshalSlice(t, deadQueue) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff) + t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) } - retryQueue := r.ZRange(rdb.Retry, 0, -1).Val() + retryQueue := r.ZRange(retryQ, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, retryQueue) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff) + t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff) } } }