2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update RDB.Schedule with task state

This commit is contained in:
Ken Hibino 2021-04-24 06:44:44 -07:00
parent 56e5762eea
commit b1d717c842
2 changed files with 47 additions and 12 deletions

View File

@ -333,7 +333,11 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
var scheduleCmd = redis.NewScript(`
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5])
redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"state", "scheduled",
"timeout", ARGV[4],
"deadline", ARGV[5])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1
`)

View File

@ -86,8 +86,8 @@ func TestEnqueue(t *testing.T) {
// Check Pending list has task ID.
pendingKey := base.PendingKey(tc.msg.Queue)
pendingIDs := r.client.LRange(pendingKey, 0, -1).Val()
if len(pendingIDs) != 1 {
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs))
if n := len(pendingIDs); n != 1 {
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, n)
continue
}
if pendingIDs[0] != tc.msg.ID.String() {
@ -831,24 +831,55 @@ func TestSchedule(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt)
err := r.Schedule(tc.msg, tc.processAt)
if err != nil {
t.Errorf("%s = %v, want nil", desc, err)
t.Errorf("(*RDB).Schedule(%v, %v) = %v, want nil",
tc.msg, tc.processAt, err)
continue
}
gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue)
if len(gotScheduled) != 1 {
t.Errorf("%s inserted %d items to %q, want 1 items inserted",
desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue))
// Check Scheduled zset has task ID.
scheduledKey := base.ScheduledKey(tc.msg.Queue)
zs := r.client.ZRangeWithScores(scheduledKey, 0, -1).Val()
if n := len(zs); n != 1 {
t.Errorf("Redis ZSET %q contains %d elements, want 1",
scheduledKey, n)
continue
}
if int64(gotScheduled[0].Score) != tc.processAt.Unix() {
t.Errorf("%s inserted an item with score %d, want %d",
desc, int64(gotScheduled[0].Score), tc.processAt.Unix())
if got := zs[0].Member.(string); got != tc.msg.ID.String() {
t.Errorf("Redis ZSET %q member: got %v, want %v",
scheduledKey, got, tc.msg.ID.String())
continue
}
if got := int64(zs[0].Score); got != tc.processAt.Unix() {
t.Errorf("Redis ZSET %q score: got %d, want %d",
scheduledKey, got, tc.processAt.Unix())
continue
}
// Check the values under the task key.
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
decoded := h.MustUnmarshal(t, encoded)
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s",
decoded, tc.msg, diff)
}
state := r.client.HGet(taskKey, "state").Val() // "state" field
if want := "scheduled"; state != want {
t.Errorf("state field under task-key is set to %q, want %q",
state, want)
}
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
}
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
}
// Check queue is in the AllQueues set.
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}