From 44c657bec6eb5d33177580a7b6bb0aa92763aa5b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 22 Jan 2020 06:28:47 -0800 Subject: [PATCH] Rate limit error logs --- go.mod | 1 + go.sum | 1 + processor.go | 9 ++++++++- 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 6084911..8e32621 100644 --- a/go.mod +++ b/go.mod @@ -17,5 +17,6 @@ require ( go.uber.org/goleak v0.10.0 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e // indirect golang.org/x/text v0.3.2 // indirect + golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 gopkg.in/yaml.v2 v2.2.7 // indirect ) diff --git a/go.sum b/go.sum index 2bc98ac..df8254a 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/processor.go b/processor.go index 8212e44..de3496b 100644 --- a/processor.go +++ b/processor.go @@ -13,6 +13,7 @@ import ( "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" + "golang.org/x/time/rate" ) type processor struct { @@ -30,6 +31,9 @@ type processor struct { // channel via which to send sync requests to syncer. syncRequestCh chan<- *syncRequest + // rate limiter to prevent spamming logs with a bunch of errors. + errLogLimiter *rate.Limiter + // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} @@ -66,6 +70,7 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retry orderedQueues: orderedQueues, retryDelayFunc: fn, syncRequestCh: syncRequestCh, + errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), sema: make(chan struct{}, n), done: make(chan struct{}), abort: make(chan struct{}), @@ -136,7 +141,9 @@ func (p *processor) exec() { return } if err != nil { - logger.error("Dequeue error: %v", err) + if p.errLogLimiter.Allow() { + logger.error("Dequeue error: %v", err) + } return }