mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-14 11:31:18 +08:00
Update RDB.Enqueue with task state
This commit is contained in:
parent
9c95c41651
commit
5ec41e388b
@ -55,7 +55,11 @@ func (r *RDB) Ping() error {
|
|||||||
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
||||||
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
||||||
var enqueueCmd = redis.NewScript(`
|
var enqueueCmd = redis.NewScript(`
|
||||||
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4])
|
redis.call("HSET", KEYS[1],
|
||||||
|
"msg", ARGV[1],
|
||||||
|
"state", "pending",
|
||||||
|
"timeout", ARGV[3],
|
||||||
|
"deadline", ARGV[4])
|
||||||
redis.call("LPUSH", KEYS[2], ARGV[2])
|
redis.call("LPUSH", KEYS[2], ARGV[2])
|
||||||
return 1
|
return 1
|
||||||
`)
|
`)
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@ -79,16 +80,42 @@ func TestEnqueue(t *testing.T) {
|
|||||||
err := r.Enqueue(tc.msg)
|
err := r.Enqueue(tc.msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err)
|
||||||
}
|
|
||||||
|
|
||||||
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
|
|
||||||
if len(gotPending) != 1 {
|
|
||||||
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending))
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
|
|
||||||
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
// Check Pending list has task ID.
|
||||||
|
pendingKey := base.PendingKey(tc.msg.Queue)
|
||||||
|
pendingIDs := r.client.LRange(pendingKey, 0, -1).Val()
|
||||||
|
if len(pendingIDs) != 1 {
|
||||||
|
t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
if pendingIDs[0] != tc.msg.ID.String() {
|
||||||
|
t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the value under the task key.
|
||||||
|
taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String())
|
||||||
|
encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field
|
||||||
|
decoded := h.MustUnmarshal(t, encoded)
|
||||||
|
if diff := cmp.Diff(tc.msg, decoded); diff != "" {
|
||||||
|
t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff)
|
||||||
|
}
|
||||||
|
state := r.client.HGet(taskKey, "state").Val() // "state" field
|
||||||
|
if state != "pending" {
|
||||||
|
t.Errorf("state field under task-key is set to %q, want %q", state, "pending")
|
||||||
|
}
|
||||||
|
timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field
|
||||||
|
if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want {
|
||||||
|
t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want)
|
||||||
|
}
|
||||||
|
deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field
|
||||||
|
if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want {
|
||||||
|
t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check queue is in the AllQueues set.
|
||||||
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
||||||
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
|
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user