mirror of
https://github.com/hibiken/asynq.git
synced 2025-07-04 12:23:39 +08:00
Fix memory leaks caused by time.After
This commit is contained in:
parent
d04888e748
commit
b7cb257b5a
@ -58,12 +58,13 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
|
||||
err error
|
||||
)
|
||||
// Try until successfully connect to Redis.
|
||||
timer := time.NewTimer(s.retryTimeout)
|
||||
for {
|
||||
pubsub, err = s.broker.CancelationPubSub()
|
||||
if err != nil {
|
||||
s.logger.Errorf("cannot subscribe to cancelation channel: %v", err)
|
||||
select {
|
||||
case <-time.After(s.retryTimeout):
|
||||
case <-timer.C:
|
||||
continue
|
||||
case <-s.done:
|
||||
s.logger.Debug("Subscriber done")
|
||||
|
@ -57,6 +57,7 @@ func (s *syncer) start(wg *sync.WaitGroup) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
var requests []*syncRequest
|
||||
timer := time.NewTimer(s.interval)
|
||||
for {
|
||||
select {
|
||||
case <-s.done:
|
||||
@ -70,7 +71,7 @@ func (s *syncer) start(wg *sync.WaitGroup) {
|
||||
return
|
||||
case req := <-s.requestsCh:
|
||||
requests = append(requests, req)
|
||||
case <-time.After(s.interval):
|
||||
case <-timer.C:
|
||||
var temp []*syncRequest
|
||||
for _, req := range requests {
|
||||
if req.deadline.Before(time.Now()) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user