mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Refactor rdb_test.go
This commit is contained in:
parent
4aa5078dc4
commit
737de898eb
68
rdb_test.go
68
rdb_test.go
@ -19,7 +19,8 @@ func init() {
|
|||||||
|
|
||||||
// setup connects to a redis database and flush all keys
|
// setup connects to a redis database and flush all keys
|
||||||
// before returning an instance of rdb.
|
// before returning an instance of rdb.
|
||||||
func setup() *rdb {
|
func setup(t *testing.T) *rdb {
|
||||||
|
t.Helper()
|
||||||
client = redis.NewClient(&redis.Options{
|
client = redis.NewClient(&redis.Options{
|
||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
DB: 15, // use database 15 to separate from other applications
|
DB: 15, // use database 15 to separate from other applications
|
||||||
@ -31,40 +32,57 @@ func setup() *rdb {
|
|||||||
return newRDB(client)
|
return newRDB(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
func randomTask(taskType, qname string) *taskMessage {
|
func randomTask(taskType, qname string, payload map[string]interface{}) *taskMessage {
|
||||||
return &taskMessage{
|
return &taskMessage{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Type: taskType,
|
Type: taskType,
|
||||||
Queue: qname,
|
Queue: qname,
|
||||||
Retry: rand.Intn(100),
|
Retry: rand.Intn(100),
|
||||||
|
Payload: make(map[string]interface{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPush(t *testing.T) {
|
func TestPush(t *testing.T) {
|
||||||
r := setup()
|
r := setup(t)
|
||||||
msg := randomTask("send_email", "default")
|
tests := []struct {
|
||||||
|
msg *taskMessage
|
||||||
err := r.push(msg)
|
}{
|
||||||
if err != nil {
|
{msg: randomTask("send_email", "default", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
|
||||||
t.Fatalf("could not push message to queue: %v", err)
|
{msg: randomTask("generate_csv", "default", map[string]interface{}{})},
|
||||||
|
{msg: randomTask("sync", "default", nil)},
|
||||||
}
|
}
|
||||||
|
|
||||||
res := client.LRange("asynq:queues:default", 0, -1).Val()
|
for _, tc := range tests {
|
||||||
|
err := r.push(tc.msg)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
res := client.LRange(defaultQueue, 0, -1).Val()
|
||||||
if len(res) != 1 {
|
if len(res) != 1 {
|
||||||
t.Fatalf("len(res) = %d, want %d", len(res), 1)
|
t.Errorf("LIST %q has length %d, want 1", defaultQueue, len(res))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
bytes, err := json.Marshal(msg)
|
if !client.SIsMember(allQueues, defaultQueue).Val() {
|
||||||
if err != nil {
|
t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQueue)
|
||||||
t.Fatalf("json.Marshal(msg) failed: %v", err)
|
}
|
||||||
|
var persisted taskMessage
|
||||||
|
if err := json.Unmarshal([]byte(res[0]), &persisted); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(*tc.msg, persisted); diff != "" {
|
||||||
|
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
||||||
|
}
|
||||||
|
// clean up before the next test case.
|
||||||
|
if err := client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if res[0] != string(bytes) {
|
|
||||||
t.Fatalf("res[0] = %s, want %s", res[0], string(bytes))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDequeueImmediateReturn(t *testing.T) {
|
func TestDequeueImmediateReturn(t *testing.T) {
|
||||||
r := setup()
|
r := setup(t)
|
||||||
msg := randomTask("export_csv", "csv")
|
msg := randomTask("export_csv", "csv", nil)
|
||||||
r.push(msg)
|
r.push(msg)
|
||||||
|
|
||||||
res, err := r.dequeue("asynq:queues:csv", time.Second)
|
res, err := r.dequeue("asynq:queues:csv", time.Second)
|
||||||
@ -89,7 +107,7 @@ func TestDequeueImmediateReturn(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDequeueTimeout(t *testing.T) {
|
func TestDequeueTimeout(t *testing.T) {
|
||||||
r := setup()
|
r := setup(t)
|
||||||
|
|
||||||
_, err := r.dequeue("asynq:queues:default", time.Second)
|
_, err := r.dequeue("asynq:queues:default", time.Second)
|
||||||
if err != errQueuePopTimeout {
|
if err != errQueuePopTimeout {
|
||||||
@ -98,11 +116,11 @@ func TestDequeueTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestMoveAll(t *testing.T) {
|
func TestMoveAll(t *testing.T) {
|
||||||
r := setup()
|
r := setup(t)
|
||||||
seed := []*taskMessage{
|
seed := []*taskMessage{
|
||||||
randomTask("send_email", "default"),
|
randomTask("send_email", "default", nil),
|
||||||
randomTask("export_csv", "csv"),
|
randomTask("export_csv", "csv", nil),
|
||||||
randomTask("sync_stuff", "sync"),
|
randomTask("sync_stuff", "sync", nil),
|
||||||
}
|
}
|
||||||
for _, task := range seed {
|
for _, task := range seed {
|
||||||
bytes, err := json.Marshal(task)
|
bytes, err := json.Marshal(task)
|
||||||
@ -128,9 +146,9 @@ func TestMoveAll(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestForward(t *testing.T) {
|
func TestForward(t *testing.T) {
|
||||||
r := setup()
|
r := setup(t)
|
||||||
t1 := randomTask("send_email", defaultQueue)
|
t1 := randomTask("send_email", defaultQueue, nil)
|
||||||
t2 := randomTask("generate_csv", defaultQueue)
|
t2 := randomTask("generate_csv", defaultQueue, nil)
|
||||||
secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time
|
secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time
|
||||||
json1, err := json.Marshal(t1)
|
json1, err := json.Marshal(t1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user