mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Add RDB.MarkAsComplete method
This commit is contained in:
parent
1b8ba80a95
commit
a4e25cba75
@ -139,6 +139,12 @@ func TaskMessageWithError(t base.TaskMessage, errMsg string, failedAt time.Time)
|
|||||||
return &t
|
return &t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TaskMessageWithCompletedAt returns an updated copy of t after completion.
|
||||||
|
func TaskMessageWithCompletedAt(t base.TaskMessage, completedAt time.Time) *base.TaskMessage {
|
||||||
|
t.CompletedAt = completedAt.Unix()
|
||||||
|
return &t
|
||||||
|
}
|
||||||
|
|
||||||
// MustMarshal marshals given task message and returns a json string.
|
// MustMarshal marshals given task message and returns a json string.
|
||||||
// Calling test will fail if marshaling errors out.
|
// Calling test will fail if marshaling errors out.
|
||||||
func MustMarshal(tb testing.TB, msg *base.TaskMessage) string {
|
func MustMarshal(tb testing.TB, msg *base.TaskMessage) string {
|
||||||
@ -224,6 +230,13 @@ func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qna
|
|||||||
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive)
|
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SeedCompletedQueue initializes the completed set witht the given entries.
|
||||||
|
func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||||
|
tb.Helper()
|
||||||
|
r.SAdd(context.Background(), base.AllQueues, qname)
|
||||||
|
seedRedisZSet(tb, r, base.CompletedKey(qname), entries, base.TaskStateCompleted)
|
||||||
|
}
|
||||||
|
|
||||||
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
||||||
//
|
//
|
||||||
// pending maps a queue name to a list of messages.
|
// pending maps a queue name to a list of messages.
|
||||||
@ -274,6 +287,14 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SeedAllCompletedQueues initializes all of the completed queues with the given entries.
|
||||||
|
func SeedAllCompletedQueues(tb testing.TB, r redis.UniversalClient, completed map[string][]base.Z) {
|
||||||
|
tb.Helper()
|
||||||
|
for q, entries := range completed {
|
||||||
|
SeedCompletedQueue(tb, r, entries, q)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
|
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
|
||||||
msgs []*base.TaskMessage, state base.TaskState) {
|
msgs []*base.TaskMessage, state base.TaskState) {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
@ -367,6 +388,13 @@ func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) [
|
|||||||
return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
|
return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCompletedMessages returns all completed task messages in the given queue.
|
||||||
|
// It also asserts the state field of the task.
|
||||||
|
func GetCompletedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
|
||||||
|
tb.Helper()
|
||||||
|
return getMessagesFromZSet(tb, r, qname, base.CompletedKey, base.TaskStateCompleted)
|
||||||
|
}
|
||||||
|
|
||||||
// GetScheduledEntries returns all scheduled messages and its score in the given queue.
|
// GetScheduledEntries returns all scheduled messages and its score in the given queue.
|
||||||
// It also asserts the state field of the task.
|
// It also asserts the state field of the task.
|
||||||
func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||||
@ -395,6 +423,13 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [
|
|||||||
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
|
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetCompletedEntries returns all completed messages and its score in the given queue.
|
||||||
|
// It also asserts the state field of the task.
|
||||||
|
func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||||
|
tb.Helper()
|
||||||
|
return getMessagesFromZSetWithScores(tb, r, qname, base.CompletedKey, base.TaskStateCompleted)
|
||||||
|
}
|
||||||
|
|
||||||
// Retrieves all messages stored under `keyFn(qname)` key in redis list.
|
// Retrieves all messages stored under `keyFn(qname)` key in redis list.
|
||||||
func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
|
func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
|
||||||
keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage {
|
keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage {
|
||||||
|
@ -330,7 +330,7 @@ end
|
|||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
|
|
||||||
// Done removes the task from active queue to mark the task as done.
|
// Done removes the task from active queue and deletes the task.
|
||||||
// It removes a uniqueness lock acquired by the task, if any.
|
// It removes a uniqueness lock acquired by the task, if any.
|
||||||
func (r *RDB) Done(msg *base.TaskMessage) error {
|
func (r *RDB) Done(msg *base.TaskMessage) error {
|
||||||
var op errors.Op = "rdb.Done"
|
var op errors.Op = "rdb.Done"
|
||||||
@ -346,6 +346,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
|||||||
msg.ID,
|
msg.ID,
|
||||||
expireAt.Unix(),
|
expireAt.Unix(),
|
||||||
}
|
}
|
||||||
|
// Note: We cannot pass empty unique key when running this script in redis-cluster.
|
||||||
if len(msg.UniqueKey) > 0 {
|
if len(msg.UniqueKey) > 0 {
|
||||||
keys = append(keys, msg.UniqueKey)
|
keys = append(keys, msg.UniqueKey)
|
||||||
return r.runScript(op, doneUniqueCmd, keys, argv...)
|
return r.runScript(op, doneUniqueCmd, keys, argv...)
|
||||||
@ -353,6 +354,96 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
|||||||
return r.runScript(op, doneCmd, keys, argv...)
|
return r.runScript(op, doneCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// KEYS[1] -> asynq:{<qname>}:active
|
||||||
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||||
|
// KEYS[3] -> asynq:{<qname>}:completed
|
||||||
|
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
|
||||||
|
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||||
|
// ARGV[1] -> task ID
|
||||||
|
// ARGV[2] -> stats expiration timestamp
|
||||||
|
// ARGV[3] -> task exipration time in unix time
|
||||||
|
// ARGV[4] -> task message data
|
||||||
|
var markAsCompleteCmd = redis.NewScript(`
|
||||||
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||||
|
return redis.error_reply("NOT FOUND")
|
||||||
|
end
|
||||||
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||||
|
return redis.error_reply("NOT FOUND")
|
||||||
|
end
|
||||||
|
if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then
|
||||||
|
redis.redis.error_reply("INTERNAL")
|
||||||
|
end
|
||||||
|
redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")
|
||||||
|
local n = redis.call("INCR", KEYS[5])
|
||||||
|
if tonumber(n) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[5], ARGV[2])
|
||||||
|
end
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
|
||||||
|
// KEYS[1] -> asynq:{<qname>}:active
|
||||||
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||||
|
// KEYS[3] -> asynq:{<qname>}:completed
|
||||||
|
// KEYS[4] -> asynq:{<qname>}:t:<task_id>
|
||||||
|
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
||||||
|
// KEYS[6] -> asynq:{<qname>}:unique:{<checksum>}
|
||||||
|
// ARGV[1] -> task ID
|
||||||
|
// ARGV[2] -> stats expiration timestamp
|
||||||
|
// ARGV[3] -> task exipration time in unix time
|
||||||
|
// ARGV[4] -> task message data
|
||||||
|
var markAsCompleteUniqueCmd = redis.NewScript(`
|
||||||
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
||||||
|
return redis.error_reply("NOT FOUND")
|
||||||
|
end
|
||||||
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||||
|
return redis.error_reply("NOT FOUND")
|
||||||
|
end
|
||||||
|
if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then
|
||||||
|
redis.redis.error_reply("INTERNAL")
|
||||||
|
end
|
||||||
|
redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")
|
||||||
|
local n = redis.call("INCR", KEYS[5])
|
||||||
|
if tonumber(n) == 1 then
|
||||||
|
redis.call("EXPIREAT", KEYS[5], ARGV[2])
|
||||||
|
end
|
||||||
|
if redis.call("GET", KEYS[6]) == ARGV[1] then
|
||||||
|
redis.call("DEL", KEYS[6])
|
||||||
|
end
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
|
||||||
|
// MarkAsComplete removes the task from active queue to mark the task as completed.
|
||||||
|
// It removes a uniqueness lock acquired by the task, if any.
|
||||||
|
func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error {
|
||||||
|
var op errors.Op = "rdb.MarkAsComplete"
|
||||||
|
now := time.Now()
|
||||||
|
statsExpireAt := now.Add(statsTTL)
|
||||||
|
msg.CompletedAt = now.Unix()
|
||||||
|
encoded, err := base.EncodeMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
|
}
|
||||||
|
keys := []string{
|
||||||
|
base.ActiveKey(msg.Queue),
|
||||||
|
base.DeadlinesKey(msg.Queue),
|
||||||
|
base.CompletedKey(msg.Queue),
|
||||||
|
base.TaskKey(msg.Queue, msg.ID),
|
||||||
|
base.ProcessedKey(msg.Queue, now),
|
||||||
|
}
|
||||||
|
argv := []interface{}{
|
||||||
|
msg.ID,
|
||||||
|
statsExpireAt.Unix(),
|
||||||
|
now.Unix() + msg.ResultTTL,
|
||||||
|
encoded,
|
||||||
|
}
|
||||||
|
// Note: We cannot pass empty unique key when running this script in redis-cluster.
|
||||||
|
if len(msg.UniqueKey) > 0 {
|
||||||
|
keys = append(keys, msg.UniqueKey)
|
||||||
|
return r.runScript(op, markAsCompleteUniqueCmd, keys, argv...)
|
||||||
|
}
|
||||||
|
return r.runScript(op, markAsCompleteCmd, keys, argv...)
|
||||||
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:active
|
// KEYS[1] -> asynq:{<qname>}:active
|
||||||
// KEYS[2] -> asynq:{<qname>}:deadlines
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
||||||
// KEYS[3] -> asynq:{<qname>}:pending
|
// KEYS[3] -> asynq:{<qname>}:pending
|
||||||
|
@ -677,17 +677,17 @@ func TestDone(t *testing.T) {
|
|||||||
Payload: nil,
|
Payload: nil,
|
||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
Deadline: 0,
|
Deadline: 0,
|
||||||
UniqueKey: "asynq:{default}:unique:reindex:nil",
|
UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72",
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
}
|
}
|
||||||
t1Deadline := now.Unix() + t1.Timeout
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
t2Deadline := t2.Deadline
|
t2Deadline := t2.Deadline
|
||||||
t3Deadline := now.Unix() + t3.Deadline
|
t3Deadline := now.Unix() + t3.Timeout
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
desc string
|
desc string
|
||||||
active map[string][]*base.TaskMessage // initial state of the active list
|
active map[string][]*base.TaskMessage // initial state of the active list
|
||||||
deadlines map[string][]base.Z // initial state of deadlines set
|
deadlines map[string][]base.Z // initial state of the deadlines set
|
||||||
target *base.TaskMessage // task to remove
|
target *base.TaskMessage // task to remove
|
||||||
wantActive map[string][]*base.TaskMessage // final state of the active list
|
wantActive map[string][]*base.TaskMessage // final state of the active list
|
||||||
wantDeadlines map[string][]base.Z // final state of the deadline set
|
wantDeadlines map[string][]base.Z // final state of the deadline set
|
||||||
@ -804,6 +804,201 @@ func TestDone(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMarkAsComplete(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
now := time.Now()
|
||||||
|
t1 := &base.TaskMessage{
|
||||||
|
ID: uuid.NewString(),
|
||||||
|
Type: "send_email",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
Queue: "default",
|
||||||
|
ResultTTL: 3600,
|
||||||
|
}
|
||||||
|
t2 := &base.TaskMessage{
|
||||||
|
ID: uuid.NewString(),
|
||||||
|
Type: "export_csv",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 0,
|
||||||
|
Deadline: now.Add(2 * time.Hour).Unix(),
|
||||||
|
Queue: "custom",
|
||||||
|
ResultTTL: 7200,
|
||||||
|
}
|
||||||
|
t3 := &base.TaskMessage{
|
||||||
|
ID: uuid.NewString(),
|
||||||
|
Type: "reindex",
|
||||||
|
Payload: nil,
|
||||||
|
Timeout: 1800,
|
||||||
|
Deadline: 0,
|
||||||
|
UniqueKey: "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72",
|
||||||
|
Queue: "default",
|
||||||
|
ResultTTL: 1800,
|
||||||
|
}
|
||||||
|
t1Deadline := now.Unix() + t1.Timeout
|
||||||
|
t2Deadline := t2.Deadline
|
||||||
|
t3Deadline := now.Unix() + t3.Timeout
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
desc string
|
||||||
|
active map[string][]*base.TaskMessage // initial state of the active list
|
||||||
|
deadlines map[string][]base.Z // initial state of the deadlines set
|
||||||
|
completed map[string][]base.Z // initial state of the completed set
|
||||||
|
target *base.TaskMessage // task to mark as completed
|
||||||
|
wantActive map[string][]*base.TaskMessage // final state of the active list
|
||||||
|
wantDeadlines map[string][]base.Z // final state of the deadline set
|
||||||
|
wantCompleted func(completedAt time.Time) map[string][]base.Z // final state of the completed set
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "select a message from the correct queue",
|
||||||
|
active: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"custom": {t2},
|
||||||
|
},
|
||||||
|
deadlines: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: t1Deadline}},
|
||||||
|
"custom": {{Message: t2, Score: t2Deadline}},
|
||||||
|
},
|
||||||
|
completed: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
"custom": {},
|
||||||
|
},
|
||||||
|
target: t1,
|
||||||
|
wantActive: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"custom": {t2},
|
||||||
|
},
|
||||||
|
wantDeadlines: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
"custom": {{Message: t2, Score: t2Deadline}},
|
||||||
|
},
|
||||||
|
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
|
||||||
|
return map[string][]base.Z{
|
||||||
|
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.ResultTTL}},
|
||||||
|
"custom": {},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with one queue",
|
||||||
|
active: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
},
|
||||||
|
deadlines: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: t1Deadline}},
|
||||||
|
},
|
||||||
|
completed: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
target: t1,
|
||||||
|
wantActive: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
wantDeadlines: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
|
||||||
|
return map[string][]base.Z{
|
||||||
|
"default": {{Message: h.TaskMessageWithCompletedAt(*t1, completedAt), Score: completedAt.Unix() + t1.ResultTTL}},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with multiple messages in a queue",
|
||||||
|
active: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1, t3},
|
||||||
|
"custom": {t2},
|
||||||
|
},
|
||||||
|
deadlines: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: t1Deadline}, {Message: t3, Score: t3Deadline}},
|
||||||
|
"custom": {{Message: t2, Score: t2Deadline}},
|
||||||
|
},
|
||||||
|
completed: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
"custom": {},
|
||||||
|
},
|
||||||
|
target: t3,
|
||||||
|
wantActive: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"custom": {t2},
|
||||||
|
},
|
||||||
|
wantDeadlines: map[string][]base.Z{
|
||||||
|
"default": {{Message: t1, Score: t1Deadline}},
|
||||||
|
"custom": {{Message: t2, Score: t2Deadline}},
|
||||||
|
},
|
||||||
|
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
|
||||||
|
return map[string][]base.Z{
|
||||||
|
"default": {{Message: h.TaskMessageWithCompletedAt(*t3, completedAt), Score: completedAt.Unix() + t3.ResultTTL}},
|
||||||
|
"custom": {},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
h.SeedAllDeadlines(t, r.client, tc.deadlines)
|
||||||
|
h.SeedAllActiveQueues(t, r.client, tc.active)
|
||||||
|
h.SeedAllCompletedQueues(t, r.client, tc.completed)
|
||||||
|
for _, msgs := range tc.active {
|
||||||
|
for _, msg := range msgs {
|
||||||
|
// Set uniqueness lock if unique key is present.
|
||||||
|
if len(msg.UniqueKey) > 0 {
|
||||||
|
err := r.client.SetNX(context.Background(), msg.UniqueKey, msg.ID, time.Minute).Err()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
completedAt := time.Now()
|
||||||
|
err := r.MarkAsComplete(tc.target)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%s; (*RDB).MarkAsCompleted(task) = %v, want nil", tc.desc, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for queue, want := range tc.wantActive {
|
||||||
|
gotActive := h.GetActiveMessages(t, r.client, queue)
|
||||||
|
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.ActiveKey(queue), diff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for queue, want := range tc.wantDeadlines {
|
||||||
|
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
|
||||||
|
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.DeadlinesKey(queue), diff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for queue, want := range tc.wantCompleted(completedAt) {
|
||||||
|
gotCompleted := h.GetCompletedEntries(t, r.client, queue)
|
||||||
|
if diff := cmp.Diff(want, gotCompleted, h.SortZSetEntryOpt); diff != "" {
|
||||||
|
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.CompletedKey(queue), diff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
processedKey := base.ProcessedKey(tc.target.Queue, time.Now())
|
||||||
|
gotProcessed := r.client.Get(context.Background(), processedKey).Val()
|
||||||
|
if gotProcessed != "1" {
|
||||||
|
t.Errorf("%s; GET %q = %q, want 1", tc.desc, processedKey, gotProcessed)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotTTL := r.client.TTL(context.Background(), processedKey).Val()
|
||||||
|
if gotTTL > statsTTL {
|
||||||
|
t.Errorf("%s; TTL %q = %v, want less than or equal to %v", tc.desc, processedKey, gotTTL, statsTTL)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tc.target.UniqueKey) > 0 && r.client.Exists(context.Background(), tc.target.UniqueKey).Val() != 0 {
|
||||||
|
t.Errorf("%s; Uniqueness lock %q still exists", tc.desc, tc.target.UniqueKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestRequeue(t *testing.T) {
|
func TestRequeue(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user