From 691bc7db81056584bcfa97b2ccbf34c4ddb434a5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 19 Apr 2021 06:28:58 -0700 Subject: [PATCH] Update RDB.EnqueueUnique with task state --- internal/rdb/rdb.go | 6 +++++- internal/rdb/rdb_test.go | 38 ++++++++++++++++++++++++++++++++------ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c6c496d..d032043 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -99,7 +99,11 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) +redis.call("HSET", KEYS[2], + "msg", ARGV[3], + "state", "pending", + "timeout", ARGV[4], + "deadline", ARGV[5]) redis.call("LPUSH", KEYS[3], ARGV[1]) return 1 `) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 9f6583b..0ca1397 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -150,14 +150,40 @@ func TestEnqueueUnique(t *testing.T) { tc.msg, tc.ttl, err) continue } - gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue) - if len(gotPending) != 1 { - t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending)) + + // Check Pending list has task ID. + pendingKey := base.PendingKey(tc.msg.Queue) + pendingIDs := r.client.LRange(pendingKey, 0, -1).Val() + if len(pendingIDs) != 1 { + t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs)) continue } - if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { - t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) + if pendingIDs[0] != tc.msg.ID.String() { + t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()}) + continue } + + // Check the value under the task key. + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field + decoded := h.MustUnmarshal(t, encoded) + if diff := cmp.Diff(tc.msg, decoded); diff != "" { + t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff) + } + state := r.client.HGet(taskKey, "state").Val() // "state" field + if state != "pending" { + t.Errorf("state field under task-key is set to %q, want %q", state, "pending") + } + timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field + if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want { + t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want) + } + deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field + if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want { + t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want) + } + + // Check queue is in the AllQueues set. if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) } @@ -170,7 +196,7 @@ func TestEnqueueUnique(t *testing.T) { continue } gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() - if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 2)) { t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) continue }