From 298a420f9fb69a25290e01292e03e9a4a16fa998 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 24 Apr 2021 20:41:44 -0700 Subject: [PATCH] Update RDB.ScheduleUnique with task state --- internal/rdb/rdb.go | 6 ++++- internal/rdb/rdb_test.go | 55 ++++++++++++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c6d1501..04e9f3b 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -379,7 +379,11 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6]) +redis.call("HSET", KEYS[2], + "msg", ARGV[4], + "state", "scheduled", + "timeout", ARGV[5], + "deadline", ARGV[6]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) return 1 `) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 38eb752..0f44828 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -915,20 +915,57 @@ func TestScheduleUnique(t *testing.T) { continue } - gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue) - if len(gotScheduled) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue)) + // Check Scheduled zset has task ID. + scheduledKey := base.ScheduledKey(tc.msg.Queue) + zs := r.client.ZRangeWithScores(scheduledKey, 0, -1).Val() + if n := len(zs); n != 1 { + t.Errorf("Redis ZSET %q contains %d elements, want 1", + scheduledKey, n) continue } - if int64(gotScheduled[0].Score) != tc.processAt.Unix() { - t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) + if got := zs[0].Member.(string); got != tc.msg.ID.String() { + t.Errorf("Redis ZSET %q member: got %v, want %v", + scheduledKey, got, tc.msg.ID.String()) + continue + } + if got := int64(zs[0].Score); got != tc.processAt.Unix() { + t.Errorf("Redis ZSET %q score: got %d, want %d", + scheduledKey, got, tc.processAt.Unix()) continue } + // Check the values under the task key. + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field + decoded := h.MustUnmarshal(t, encoded) + if diff := cmp.Diff(tc.msg, decoded); diff != "" { + t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", + decoded, tc.msg, diff) + } + state := r.client.HGet(taskKey, "state").Val() // "state" field + if want := "scheduled"; state != want { + t.Errorf("state field under task-key is set to %q, want %q", + state, want) + } + timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field + if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want { + t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want) + } + deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field + if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want { + t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want) + } + + // Check queue is in the AllQueues set. + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) + } + + // Enqueue the second message, should fail. got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl) if got != ErrDuplicateTask { - t.Errorf("Second task: %s = %v, want %v", - desc, got, ErrDuplicateTask) + t.Errorf("Second task: %s = %v, want %v", desc, got, ErrDuplicateTask) + continue } gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() @@ -936,10 +973,6 @@ func TestScheduleUnique(t *testing.T) { t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) continue } - if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { - t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) - continue - } } }