2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

Update RDB.EnqueueUnique with task state

This commit is contained in:
Ken Hibino 2021-04-19 06:28:58 -07:00
parent 5ec41e388b
commit 56e5762eea
2 changed files with 37 additions and 7 deletions

View File

@ -99,7 +99,11 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then if not ok then
return 0 return 0
end end
redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) redis.call("HSET", KEYS[2],
"msg", ARGV[3],
"state", "pending",
"timeout", ARGV[4],
"deadline", ARGV[5])
redis.call("LPUSH", KEYS[3], ARGV[1]) redis.call("LPUSH", KEYS[3], ARGV[1])
return 1 return 1
`) `)

View File

@ -150,14 +150,40 @@ func TestEnqueueUnique(t *testing.T) {
tc.msg, tc.ttl, err) tc.msg, tc.ttl, err)
continue continue
} }
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
if len(gotPending) != 1 { // Check Pending list has task ID.
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending)) pendingKey := base.PendingKey(tc.msg.Queue)
pendingIDs := r.client.LRange(pendingKey, 0, -1).Val()
if len(pendingIDs) != 1 {
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs))
continue continue
} }
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { if pendingIDs[0] != tc.msg.ID.String() {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()})
continue
} }
// Check the value under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
if state != "pending" {
t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
} }
@ -170,7 +196,7 @@ func TestEnqueueUnique(t *testing.T) {
continue continue
} }
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 2)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue continue
} }