mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Update RDB.ScheduleUnique with task state
This commit is contained in:
parent
b1d717c842
commit
298a420f9f
@ -379,7 +379,11 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
||||
if not ok then
|
||||
return 0
|
||||
end
|
||||
redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6])
|
||||
redis.call("HSET", KEYS[2],
|
||||
"msg", ARGV[4],
|
||||
"state", "scheduled",
|
||||
"timeout", ARGV[5],
|
||||
"deadline", ARGV[6])
|
||||
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
|
||||
return 1
|
||||
`)
|
||||
|
@ -915,20 +915,57 @@ func TestScheduleUnique(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue)
|
||||
if len(gotScheduled) != 1 {
|
||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue))
|
||||
// Check Scheduled zset has task ID.
|
||||
scheduledKey := base.ScheduledKey(tc.msg.Queue)
|
||||
zs := r.client.ZRangeWithScores(scheduledKey, 0, -1).Val()
|
||||
if n := len(zs); n != 1 {
|
||||
t.Errorf("Redis ZSET %q contains %d elements, want 1",
|
||||
scheduledKey, n)
|
||||
continue
|
||||
}
|
||||
if int64(gotScheduled[0].Score) != tc.processAt.Unix() {
|
||||
t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix())
|
||||
if got := zs[0].Member.(string); got != tc.msg.ID.String() {
|
||||
t.Errorf("Redis ZSET %q member: got %v, want %v",
|
||||
scheduledKey, got, tc.msg.ID.String())
|
||||
continue
|
||||
}
|
||||
if got := int64(zs[0].Score); got != tc.processAt.Unix() {
|
||||
t.Errorf("Redis ZSET %q score: got %d, want %d",
|
||||
scheduledKey, got, tc.processAt.Unix())
|
||||
continue
|
||||
}
|
||||
|
||||
// Check the values under the task key.
|
||||
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
|
||||
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
|
||||
decoded := h.MustUnmarshal(t, encoded)
|
||||
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
|
||||
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s",
|
||||
decoded, tc.msg, diff)
|
||||
}
|
||||
state := r.client.HGet(taskKey, "state").Val() // "state" field
|
||||
if want := "scheduled"; state != want {
|
||||
t.Errorf("state field under task-key is set to %q, want %q",
|
||||
state, want)
|
||||
}
|
||||
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
|
||||
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
|
||||
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
|
||||
}
|
||||
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
|
||||
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
||||
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
|
||||
}
|
||||
|
||||
// Check queue is in the AllQueues set.
|
||||
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
||||
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
|
||||
}
|
||||
|
||||
// Enqueue the second message, should fail.
|
||||
got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
|
||||
if got != ErrDuplicateTask {
|
||||
t.Errorf("Second task: %s = %v, want %v",
|
||||
desc, got, ErrDuplicateTask)
|
||||
t.Errorf("Second task: %s = %v, want %v", desc, got, ErrDuplicateTask)
|
||||
continue
|
||||
}
|
||||
|
||||
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
|
||||
@ -936,10 +973,6 @@ func TestScheduleUnique(t *testing.T) {
|
||||
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
|
||||
continue
|
||||
}
|
||||
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
||||
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user