2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Refactor dequeue test

This commit is contained in:
Ken Hibino 2019-11-25 20:57:53 -08:00
parent 199dcf8fdb
commit 810c40bd54

View File

@ -47,12 +47,18 @@ func TestEnqueue(t *testing.T) {
tests := []struct { tests := []struct {
msg *taskMessage msg *taskMessage
}{ }{
{msg: randomTask("send_email", "default", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})}, {msg: randomTask("send_email", "default",
{msg: randomTask("generate_csv", "default", map[string]interface{}{})}, map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
{msg: randomTask("generate_csv", "default",
map[string]interface{}{})},
{msg: randomTask("sync", "default", nil)}, {msg: randomTask("sync", "default", nil)},
} }
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case.
if err := client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
err := r.enqueue(tc.msg) err := r.enqueue(tc.msg)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
@ -73,45 +79,39 @@ func TestEnqueue(t *testing.T) {
if diff := cmp.Diff(*tc.msg, persisted); diff != "" { if diff := cmp.Diff(*tc.msg, persisted); diff != "" {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
} }
// clean up before the next test case. }
}
func TestDequeue(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"})
tests := []struct {
queued []*taskMessage
want *taskMessage
err error
inProgress int64 // length of "in-progress" tasks after dequeue
}{
{queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1},
{queued: []*taskMessage{}, want: nil, err: errQueuePopTimeout, inProgress: 0},
}
for _, tc := range tests {
// clean up db before each test case.
if err := client.FlushDB().Err(); err != nil { if err := client.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} for _, m := range tc.queued {
} r.enqueue(m)
}
func TestDequeueImmediateReturn(t *testing.T) { got, err := r.dequeue(defaultQueue, time.Second)
r := setup(t) if !cmp.Equal(got, tc.want) || err != tc.err {
msg := randomTask("export_csv", "csv", nil) t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
r.enqueue(msg) defaultQueue, got, err, tc.want, tc.err)
continue
res, err := r.dequeue("asynq:queues:csv", time.Second) }
if err != nil { if l := client.LLen(inProgress).Val(); l != tc.inProgress {
t.Fatalf("r.bpop() failed: %v", err) t.Errorf("LIST %q has length %d, want %d", inProgress, l, tc.inProgress)
} }
if !cmp.Equal(res, msg) {
t.Errorf("cmp.Equal(res, msg) = %t, want %t", false, true)
}
jobs := client.LRange(inProgress, 0, -1).Val()
if len(jobs) != 1 {
t.Fatalf("len(jobs) = %d, want %d", len(jobs), 1)
}
var tm taskMessage
if err := json.Unmarshal([]byte(jobs[0]), &tm); err != nil {
t.Fatalf("json.Marshal() failed: %v", err)
}
if diff := cmp.Diff(res, &tm); diff != "" {
t.Errorf("cmp.Diff(res, tm) = %s", diff)
}
}
func TestDequeueTimeout(t *testing.T) {
r := setup(t)
_, err := r.dequeue("asynq:queues:default", time.Second)
if err != errQueuePopTimeout {
t.Errorf("err = %v, want %v", err, errQueuePopTimeout)
} }
} }
@ -125,16 +125,16 @@ func TestMoveAll(t *testing.T) {
for _, task := range seed { for _, task := range seed {
bytes, err := json.Marshal(task) bytes, err := json.Marshal(task)
if err != nil { if err != nil {
t.Errorf("json.Marhsal() failed: %v", err) t.Fatal(err)
} }
if err := client.LPush(inProgress, string(bytes)).Err(); err != nil { if err := client.LPush(inProgress, string(bytes)).Err(); err != nil {
t.Errorf("LPUSH %q %s failed: %v", inProgress, string(bytes), err) t.Fatal(err)
} }
} }
err := r.moveAll(inProgress, defaultQueue) err := r.moveAll(inProgress, defaultQueue)
if err != nil { if err != nil {
t.Errorf("moveAll failed: %v", err) t.Errorf("moveAll(%q, %q) = %v, want nil", inProgress, defaultQueue, err)
} }
if l := client.LLen(inProgress).Val(); l != 0 { if l := client.LLen(inProgress).Val(); l != 0 {
@ -152,11 +152,11 @@ func TestForward(t *testing.T) {
secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time
json1, err := json.Marshal(t1) json1, err := json.Marshal(t1)
if err != nil { if err != nil {
t.Fatalf("json.Marshal() failed: %v", err) t.Fatal(err)
} }
json2, err := json.Marshal(t2) json2, err := json.Marshal(t2)
if err != nil { if err != nil {
t.Fatalf("json.Marshal() failed: %v", err) t.Fatal(err)
} }
client.ZAdd(scheduled, &redis.Z{ client.ZAdd(scheduled, &redis.Z{
Member: string(json1), Member: string(json1),
@ -168,7 +168,7 @@ func TestForward(t *testing.T) {
err = r.forward(scheduled) err = r.forward(scheduled)
if err != nil { if err != nil {
t.Fatalf("r.forward() failed: %v", err) t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err)
} }
if c := client.ZCard(scheduled).Val(); c != 0 { if c := client.ZCard(scheduled).Val(); c != 0 {