From 871474f220fe5d843955894c03f0331fad9c8ad1 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 12 Feb 2022 09:48:07 -0800 Subject: [PATCH] Update heartbeat goroutine to call ExtendLease on active tasks --- heartbeat.go | 9 ++ heartbeat_test.go | 165 +++++++++++++++++++++++++---- internal/asynqtest/asynqtest.go | 4 +- internal/testbroker/testbroker.go | 9 ++ internal/timeutil/timeutil.go | 6 +- internal/timeutil/timeutil_test.go | 48 +++++++++ 6 files changed, 215 insertions(+), 26 deletions(-) create mode 100644 internal/timeutil/timeutil_test.go diff --git a/heartbeat.go b/heartbeat.go index 4921031..3a63cb7 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -134,6 +134,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { }() } +// beat extends lease for workers and writes server/worker info to redis. func (h *heartbeater) beat() { h.state.mu.Lock() srvStatus := h.state.value.String() @@ -152,6 +153,7 @@ func (h *heartbeater) beat() { } var ws []*base.WorkerInfo + idsByQueue := make(map[string][]string) for id, w := range h.workers { ws = append(ws, &base.WorkerInfo{ Host: h.host, @@ -164,6 +166,7 @@ func (h *heartbeater) beat() { Started: w.started, Deadline: w.deadline, }) + idsByQueue[w.msg.Queue] = append(idsByQueue[w.msg.Queue], id) } // Note: Set TTL to be long enough so that it won't expire before we write again @@ -171,4 +174,10 @@ func (h *heartbeater) beat() { if err := h.broker.WriteServerState(&info, ws, h.interval*2); err != nil { h.logger.Errorf("could not write server state data: %v", err) } + + for qname, ids := range idsByQueue { + if err := h.broker.ExtendLease(qname, ids...); err != nil { + h.logger.Errorf("could not extend lease for tasks %v: %v", ids, err) + } + } } diff --git a/heartbeat_test.go b/heartbeat_test.go index 4004072..97bd481 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -15,21 +15,79 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/testbroker" + "github.com/hibiken/asynq/internal/timeutil" ) +// Test goes through a few phases. +// +// Phase1: Simulate Server startup; Simulate starting tasks listed in startedTasks +// Phase2: Simluate finishing tasks listed in finishedTasks +// Phase3: Simulate Server shutdown; func TestHeartbeater(t *testing.T) { r := setup(t) defer r.Close() rdbClient := rdb.NewRDB(r) + now := time.Now() + const elapsedTime = 42 * time.Second // simulated time elapsed between phase1 and phase2 + + t1 := h.NewTaskMessageWithQueue("task1", nil, "default") + t2 := h.NewTaskMessageWithQueue("task2", nil, "default") + t3 := h.NewTaskMessageWithQueue("task3", nil, "default") + tests := []struct { - interval time.Duration + // Interval between heartbeats. + interval time.Duration + + // Server info. host string pid int queues map[string]int concurrency int + + active map[string][]*base.TaskMessage // initial active set state + lease map[string][]base.Z // initial lease set state + wantLease1 map[string][]base.Z // expected lease set state after starting all startedTasks + wantLease2 map[string][]base.Z // expected lease set state after finishing all finishedTasks + startedTasks []*base.TaskMessage // tasks to send via the started channel + finishedTasks []*base.TaskMessage // tasks to send via the finished channel + + startTime time.Time // simulated start time + elapsedTime time.Duration // simulated time elapsed between starting and finishing processing tasks }{ - {2 * time.Second, "localhost", 45678, map[string]int{"default": 1}, 10}, + { + interval: 2 * time.Second, + host: "localhost", + pid: 45678, + queues: map[string]int{"default": 1}, // TODO: Test with multple queues + concurrency: 10, + active: map[string][]*base.TaskMessage{ + "default": {t1, t2, t3}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(10 * time.Second).Unix()}, + {Message: t2, Score: now.Add(10 * time.Second).Unix()}, + {Message: t3, Score: now.Add(10 * time.Second).Unix()}, + }, + }, + startedTasks: []*base.TaskMessage{t1, t2, t3}, + finishedTasks: []*base.TaskMessage{t1, t2}, + wantLease1: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Add(rdb.LeaseDuration).Unix()}, + {Message: t2, Score: now.Add(rdb.LeaseDuration).Unix()}, + {Message: t3, Score: now.Add(rdb.LeaseDuration).Unix()}, + }, + }, + wantLease2: map[string][]base.Z{ + "default": { + {Message: t3, Score: now.Add(elapsedTime).Add(rdb.LeaseDuration).Unix()}, + }, + }, + startTime: now, + elapsedTime: elapsedTime, + }, } timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) @@ -37,8 +95,15 @@ func TestHeartbeater(t *testing.T) { ignoreFieldOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID") for _, tc := range tests { h.FlushDB(t, r) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllLease(t, r, tc.lease) + + clock := timeutil.NewSimulatedClock(tc.startTime) + rdbClient.SetClock(clock) srvState := &serverState{} + startingCh := make(chan *workerInfo) + finishedCh := make(chan *base.TaskMessage) hb := newHeartbeater(heartbeaterParams{ logger: testLogger, broker: rdbClient, @@ -47,14 +112,18 @@ func TestHeartbeater(t *testing.T) { queues: tc.queues, strictPriority: false, state: srvState, - starting: make(chan *workerInfo), - finished: make(chan *base.TaskMessage), + starting: startingCh, + finished: finishedCh, }) // Change host and pid fields for testing purpose. hb.host = tc.host hb.pid = tc.pid + //=================== + // Start Phase1 + //=================== + srvState.mu.Lock() srvState.value = srvStateActive // simulating Server.Start srvState.mu.Unlock() @@ -62,17 +131,17 @@ func TestHeartbeater(t *testing.T) { var wg sync.WaitGroup hb.start(&wg) - want := &base.ServerInfo{ - Host: tc.host, - PID: tc.pid, - Queues: tc.queues, - Concurrency: tc.concurrency, - Started: time.Now(), - Status: "active", + // Simulate processor starting to work on tasks. + for _, msg := range tc.startedTasks { + startingCh <- &workerInfo{ + msg: msg, + started: now, + deadline: now.Add(30 * time.Minute), + } } - // allow for heartbeater to write to redis - time.Sleep(tc.interval) + // Wait for heartbeater to write to redis + time.Sleep(tc.interval * 2) ss, err := rdbClient.ListServers() if err != nil { @@ -82,41 +151,91 @@ func TestHeartbeater(t *testing.T) { } if len(ss) != 1 { - t.Errorf("(*RDB).ListServers returned %d process info, want 1", len(ss)) + t.Errorf("(*RDB).ListServers returned %d server info, want 1", len(ss)) hb.shutdown() continue } - if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { - t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) + wantInfo := &base.ServerInfo{ + Host: tc.host, + PID: tc.pid, + Queues: tc.queues, + Concurrency: tc.concurrency, + Started: now, + Status: "active", + ActiveWorkerCount: len(tc.startedTasks), + } + if diff := cmp.Diff(wantInfo, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { + t.Errorf("redis stored server status %+v, want %+v; (-want, +got)\n%s", ss[0], wantInfo, diff) hb.shutdown() continue } - // server state change; simulating Server.Shutdown + for qname, wantLease := range tc.wantLease1 { + gotLease := h.GetLeaseEntries(t, r, qname) + if diff := cmp.Diff(wantLease, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(qname), diff) + } + } + + //=================== + // Start Phase2 + //=================== + + clock.AdvanceTime(tc.elapsedTime) + // Simulate processor finished processing tasks. + for _, msg := range tc.finishedTasks { + if err := rdbClient.Done(msg); err != nil { + t.Fatalf("RDB.Done failed: %v", err) + } + finishedCh <- msg + } + // Wait for heartbeater to write to redis + time.Sleep(tc.interval * 2) + + for qname, wantLease := range tc.wantLease2 { + gotLease := h.GetLeaseEntries(t, r, qname) + if diff := cmp.Diff(wantLease, gotLease, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(qname), diff) + } + } + + //=================== + // Start Phase3 + //=================== + + // Server state change; simulating Server.Shutdown srvState.mu.Lock() srvState.value = srvStateClosed srvState.mu.Unlock() - // allow for heartbeater to write to redis + // Wait for heartbeater to write to redis time.Sleep(tc.interval * 2) - want.Status = "closed" + wantInfo = &base.ServerInfo{ + Host: tc.host, + PID: tc.pid, + Queues: tc.queues, + Concurrency: tc.concurrency, + Started: now, + Status: "closed", + ActiveWorkerCount: len(tc.startedTasks) - len(tc.finishedTasks), + } ss, err = rdbClient.ListServers() if err != nil { - t.Errorf("could not read process status from redis: %v", err) + t.Errorf("could not read server status from redis: %v", err) hb.shutdown() continue } if len(ss) != 1 { - t.Errorf("(*RDB).ListProcesses returned %d process info, want 1", len(ss)) + t.Errorf("(*RDB).ListServers returned %d server info, want 1", len(ss)) hb.shutdown() continue } - if diff := cmp.Diff(want, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { - t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], want, diff) + if diff := cmp.Diff(wantInfo, ss[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { + t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ss[0], wantInfo, diff) hb.shutdown() continue } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 1678fa9..5c5a15a 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -295,9 +295,9 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } // SeedAllLease initializes all of the lease sets with the given entries. -func SeedAllLease(tb testing.TB, r redis.UniversalClient, deadlines map[string][]base.Z) { +func SeedAllLease(tb testing.TB, r redis.UniversalClient, lease map[string][]base.Z) { tb.Helper() - for q, entries := range deadlines { + for q, entries := range lease { SeedLease(tb, r, entries, q) } } diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 3633efa..c3be81a 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -163,6 +163,15 @@ func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*b return tb.real.ListLeaseExpired(cutoff, qnames...) } +func (tb *TestBroker) ExtendLease(qname string, ids ...string) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.ExtendLease(qname, ids...) +} + func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { tb.mu.Lock() defer tb.mu.Unlock() diff --git a/internal/timeutil/timeutil.go b/internal/timeutil/timeutil.go index 65691ad..04081df 100644 --- a/internal/timeutil/timeutil.go +++ b/internal/timeutil/timeutil.go @@ -1,3 +1,7 @@ +// Copyright 2022 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + // Package timeutil exports functions and types related to time and date. package timeutil @@ -35,4 +39,4 @@ func (c *SimulatedClock) Now() time.Time { return c.t } func (c *SimulatedClock) SetTime(t time.Time) { c.t = t } -func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) } +func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t = c.t.Add(d) } diff --git a/internal/timeutil/timeutil_test.go b/internal/timeutil/timeutil_test.go new file mode 100644 index 0000000..d1ee41e --- /dev/null +++ b/internal/timeutil/timeutil_test.go @@ -0,0 +1,48 @@ +// Copyright 2022 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package timeutil + +import ( + "testing" + "time" +) + +func TestSimulatedClock(t *testing.T) { + now := time.Now() + + tests := []struct { + desc string + initTime time.Time + advanceBy time.Duration + wantTime time.Time + }{ + { + desc: "advance time forward", + initTime: now, + advanceBy: 30 * time.Second, + wantTime: now.Add(30 * time.Second), + }, + { + desc: "advance time backward", + initTime: now, + advanceBy: -10 * time.Second, + wantTime: now.Add(-10 * time.Second), + }, + } + + for _, tc := range tests { + c := NewSimulatedClock(tc.initTime) + + if c.Now() != tc.initTime { + t.Errorf("%s: Before Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.initTime) + } + + c.AdvanceTime(tc.advanceBy) + + if c.Now() != tc.wantTime { + t.Errorf("%s: After Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.wantTime) + } + } +}