2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Clean up processor test

This commit is contained in:
Ken Hibino 2020-02-29 11:16:46 -08:00
parent 8d3248e850
commit 95b7dcaad4

View File

@ -66,8 +66,6 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task) processed = append(processed, task)
return nil return nil
} }
workerCh := make(chan int)
go fakeHeartbeater(workerCh)
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
@ -84,7 +82,6 @@ func TestProcessorSuccess(t *testing.T) {
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
@ -155,8 +152,6 @@ func TestProcessorRetry(t *testing.T) {
handler := func(ctx context.Context, task *Task) error { handler := func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
workerCh := make(chan int)
go fakeHeartbeater(workerCh)
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations)
@ -173,7 +168,6 @@ func TestProcessorRetry(t *testing.T) {
} }
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score
gotRetry := h.GetRetryEntries(t, r) gotRetry := h.GetRetryEntries(t, r)
@ -288,8 +282,6 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1, "low": 1,
} }
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
workerCh := make(chan int)
go fakeHeartbeater(workerCh)
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations) p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations)
@ -299,7 +291,6 @@ func TestProcessorWithStrictPriority(t *testing.T) {
p.start(&wg) p.start(&wg)
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
close(workerCh)
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
@ -356,9 +347,3 @@ func TestPerform(t *testing.T) {
} }
} }
} }
// fake heartbeater to receive sends from the worker channel.
func fakeHeartbeater(ch <-chan int) {
for range ch {
}
}