2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-24 10:36:12 +08:00

Clean up tests

This commit is contained in:
Ken Hibino
2020-05-01 07:22:11 -07:00
parent 0c998a8e17
commit 5161b9368a
2 changed files with 13 additions and 21 deletions

View File

@@ -82,7 +82,8 @@ func TestServerInfoKey(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
got := ServerInfoKey(tc.hostname, tc.pid, tc.sid) got := ServerInfoKey(tc.hostname, tc.pid, tc.sid)
if got != tc.want { if got != tc.want {
t.Errorf("ServerInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) t.Errorf("ServerInfoKey(%q, %d, %q) = %q, want %q",
tc.hostname, tc.pid, tc.sid, got, tc.want)
} }
} }
} }
@@ -101,7 +102,8 @@ func TestWorkersKey(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
got := WorkersKey(tc.hostname, tc.pid, tc.sid) got := WorkersKey(tc.hostname, tc.pid, tc.sid)
if got != tc.want { if got != tc.want {
t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want) t.Errorf("WorkersKey(%q, %d, %q) = %q, want = %q",
tc.hostname, tc.pid, tc.sid, got, tc.want)
} }
} }
} }

View File

@@ -37,19 +37,16 @@ func TestProcessorSuccess(t *testing.T) {
tests := []struct { tests := []struct {
enqueued []*base.TaskMessage // initial default queue state enqueued []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run incoming []*base.TaskMessage // tasks to be enqueued during run
wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end wantProcessed []*Task // tasks to be processed at the end
}{ }{
{ {
enqueued: []*base.TaskMessage{m1}, enqueued: []*base.TaskMessage{m1},
incoming: []*base.TaskMessage{m2, m3, m4}, incoming: []*base.TaskMessage{m2, m3, m4},
wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4}, wantProcessed: []*Task{t1, t2, t3, t4},
}, },
{ {
enqueued: []*base.TaskMessage{}, enqueued: []*base.TaskMessage{},
incoming: []*base.TaskMessage{m1}, incoming: []*base.TaskMessage{m1},
wait: time.Second,
wantProcessed: []*Task{t1}, wantProcessed: []*Task{t1},
}, },
} }
@@ -68,21 +65,19 @@ func TestProcessorSuccess(t *testing.T) {
return nil return nil
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
@@ -90,7 +85,7 @@ func TestProcessorSuccess(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
time.Sleep(tc.wait) time.Sleep(time.Second) // wait for one second to allow all enqueued tasks to be processed.
p.terminate() p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
@@ -175,21 +170,19 @@ func TestProcessorRetry(t *testing.T) {
n++ n++
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: ErrorHandlerFunc(errHandler), errHandler: ErrorHandlerFunc(errHandler),
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = tc.handler p.handler = tc.handler
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
@@ -200,7 +193,7 @@ func TestProcessorRetry(t *testing.T) {
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score
gotRetry := h.GetRetryEntries(t, r) gotRetry := h.GetRetryEntries(t, r)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff)
@@ -249,7 +242,6 @@ func TestProcessorQueues(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
@@ -257,7 +249,7 @@ func TestProcessorQueues(t *testing.T) {
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
@@ -326,7 +318,6 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1, "low": 1,
} }
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
@@ -334,14 +325,13 @@ func TestProcessorWithStrictPriority(t *testing.T) {
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()