2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Clean up heartbeater test

This commit is contained in:
Ken Hibino 2020-02-16 09:45:44 -08:00
parent 3d9a222bb3
commit 39f237899b

View File

@ -16,7 +16,6 @@ import (
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
// FIXME: Make this test better.
func TestHeartbeater(t *testing.T) { func TestHeartbeater(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
@ -28,7 +27,7 @@ func TestHeartbeater(t *testing.T) {
queues map[string]int queues map[string]int
concurrency int concurrency int
}{ }{
{time.Second, "some.address.ec2.aws.com", 45678, map[string]int{"default": 1}, 10}, {time.Second, "localhost", 45678, map[string]int{"default": 1}, 10},
} }
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)
@ -40,6 +39,9 @@ func TestHeartbeater(t *testing.T) {
workerCh := make(chan int) workerCh := make(chan int)
hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh) hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh)
var wg sync.WaitGroup
hb.start(&wg)
want := &base.ProcessInfo{ want := &base.ProcessInfo{
Host: tc.host, Host: tc.host,
PID: tc.pid, PID: tc.pid,
@ -48,21 +50,25 @@ func TestHeartbeater(t *testing.T) {
Started: time.Now(), Started: time.Now(),
State: "running", State: "running",
} }
var wg sync.WaitGroup
hb.start(&wg)
// allow for heartbeater to write to redis // allow for heartbeater to write to redis
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)
got, err := rdbClient.ReadProcessInfo(tc.host, tc.pid) ps, err := rdbClient.ListProcesses()
if err != nil { if err != nil {
t.Errorf("could not read process status from redis: %v", err) t.Errorf("could not read process status from redis: %v", err)
hb.terminate() hb.terminate()
continue continue
} }
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { if len(ps) != 1 {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps))
hb.terminate()
continue
}
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
hb.terminate() hb.terminate()
continue continue
} }
@ -74,15 +80,21 @@ func TestHeartbeater(t *testing.T) {
time.Sleep(tc.interval * 2) time.Sleep(tc.interval * 2)
want.State = "stopped" want.State = "stopped"
got, err = rdbClient.ReadProcessInfo(tc.host, tc.pid) ps, err = rdbClient.ListProcesses()
if err != nil { if err != nil {
t.Errorf("could not read process status from redis: %v", err) t.Errorf("could not read process status from redis: %v", err)
hb.terminate() hb.terminate()
continue continue
} }
if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { if len(ps) != 1 {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ps))
hb.terminate()
continue
}
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" {
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
hb.terminate() hb.terminate()
continue continue
} }