From 06c4a1c7f86d739eb084ee94bf70cbafea27a45b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 7 Jun 2020 13:04:27 -0700 Subject: [PATCH] Limit the number of tasks moved by CheckAndEnqueue to prevent a long running script --- .travis.yml | 1 + .travis/benchcmp.sh | 7 ++- benchmark_test.go | 17 ++++-- internal/base/base.go | 2 +- internal/rdb/inspect.go | 11 +++- internal/rdb/inspect_test.go | 42 +++++++-------- internal/rdb/rdb.go | 88 +++++++++++++------------------ internal/rdb/rdb_test.go | 7 +-- internal/testbroker/testbroker.go | 2 +- processor.go | 2 +- scheduler.go | 11 +--- scheduler_test.go | 1 - server.go | 5 +- 13 files changed, 91 insertions(+), 105 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9d47394..1d76634 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ git: go: [1.13.x, 1.14.x] script: - go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... + - go test -run=XXX -bench=. -loglevel=debug ./... services: - redis-server after_success: diff --git a/.travis/benchcmp.sh b/.travis/benchcmp.sh index b66801c..374efb4 100755 --- a/.travis/benchcmp.sh +++ b/.travis/benchcmp.sh @@ -3,13 +3,16 @@ if [ "${TRAVIS_PULL_REQUEST_BRANCH:-$TRAVIS_BRANCH}" != "master" ]; then cd ${TRAVIS_BUILD_DIR}/.. && \ git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \ cd "${TRAVIS_REPO_SLUG}-bench" && \ + # Benchmark master git checkout master && \ go test -run=XXX -bench=. ./... > master.txt && \ + # Benchmark feature branch git checkout ${TRAVIS_COMMIT} && \ go test -run=XXX -bench=. ./... > feature.txt && \ - go get -u golang.org/x/tools/cmd/benchcmp && \ + # compare two benchmarks + go get -u golang.org/x/tools/cmd/benchcmp && \ benchcmp master.txt feature.txt; -fi \ No newline at end of file +fi diff --git a/benchmark_test.go b/benchmark_test.go index 8f4913c..d7d309a 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -7,7 +7,6 @@ package asynq import ( "context" "fmt" - "math/rand" "sync" "testing" "time" @@ -29,6 +28,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second }, + LogLevel: testLogLevel, }) // Create a bunch of tasks for i := 0; i < count; i++ { @@ -60,7 +60,6 @@ func BenchmarkEndToEnd(b *testing.B) { const count = 100000 for n := 0; n < b.N; n++ { b.StopTimer() // begin setup - rand.Seed(time.Now().UnixNano()) setup(b) redis := &RedisClientOpt{ Addr: redisAddr, @@ -72,6 +71,7 @@ func BenchmarkEndToEnd(b *testing.B) { RetryDelayFunc: func(n int, err error, t *Task) time.Duration { return time.Second }, + LogLevel: testLogLevel, }) // Create a bunch of tasks for i := 0; i < count; i++ { @@ -90,8 +90,16 @@ func BenchmarkEndToEnd(b *testing.B) { var wg sync.WaitGroup wg.Add(count * 2) handler := func(ctx context.Context, t *Task) error { - // randomly fail 1% of tasks - if rand.Intn(100) == 1 { + n, err := t.Payload.GetInt("data") + if err != nil { + b.Logf("internal error: %v", err) + } + retried, ok := GetRetryCount(ctx) + if !ok { + b.Logf("internal error: %v", err) + } + // Fail 1% of tasks for the first attempt. + if retried == 0 && n%100 == 0 { return fmt.Errorf(":(") } wg.Done() @@ -131,6 +139,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { "default": 3, "low": 1, }, + LogLevel: testLogLevel, }) // Create a bunch of tasks for i := 0; i < highCount; i++ { diff --git a/internal/base/base.go b/internal/base/base.go index 0bb8465..2210410 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -251,7 +251,7 @@ type Broker interface { Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Kill(msg *TaskMessage, errMsg string) error RequeueAll() (int64, error) - CheckAndEnqueue(qnames ...string) error + CheckAndEnqueue() error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index af46a77..0ba139a 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -32,9 +32,16 @@ type Stats struct { // Queue represents a task queue. type Queue struct { - Name string + // Name of the queue (e.g. "default", "critical"). + // Note: It doesn't include the prefix "asynq:queues:". + Name string + + // Paused indicates whether the queue is paused. + // If true, tasks in the queue should not be processed. Paused bool - Size int // number of tasks in the queue + + // Size is the number of tasks in the queue. + Size int } // DailyStats holds aggregate data for a given day. diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index c9384df..530df64 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -100,11 +100,7 @@ func TestCurrentStats(t *testing.T) { Failed: 10, Timestamp: now, Queues: []*Queue{ - { - Name: base.DefaultQueueName, - Paused: false, - Size: 0, - }, + {Name: base.DefaultQueueName, Paused: false, Size: 0}, }, }, }, @@ -709,12 +705,14 @@ func TestListRetry(t *testing.T) { func TestListRetryPagination(t *testing.T) { r := setup(t) // create 100 tasks with an increasing number of wait time. + now := time.Now() + var seed []h.ZSetEntry for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) - if err := r.Retry(msg, time.Now().Add(time.Duration(i)*time.Second), "error"); err != nil { - t.Fatal(err) - } + processAt := now.Add(time.Duration(i) * time.Second) + seed = append(seed, h.ZSetEntry{Msg: msg, Score: float64(processAt.Unix())}) } + h.SeedRetryQueue(t, r.client, seed) tests := []struct { desc string @@ -2212,9 +2210,9 @@ func TestPause(t *testing.T) { r := setup(t) tests := []struct { - initial []string // initial queue keys in the set - qname string // queue name to pause - want []string // expected queue keys in the set + initial []string // initial keys in the paused set + qname string // name of the queue to pause + want []string // expected keys in the paused set }{ {[]string{}, "default", []string{"asynq:queues:default"}}, {[]string{"asynq:queues:default"}, "critical", []string{"asynq:queues:default", "asynq:queues:critical"}}, @@ -2233,7 +2231,6 @@ func TestPause(t *testing.T) { err := r.Pause(tc.qname) if err != nil { t.Errorf("Pause(%q) returned error: %v", tc.qname, err) - continue } got, err := r.client.SMembers(base.PausedQueues).Result() @@ -2253,9 +2250,9 @@ func TestPauseError(t *testing.T) { tests := []struct { desc string // test case description - initial []string // initial queue keys in the set - qname string // queue name to pause - want []string // expected queue keys in the set + initial []string // initial keys in the paused set + qname string // name of the queue to pause + want []string // expected keys in the paused set }{ {"queue already paused", []string{"asynq:queues:default"}, "default", []string{"asynq:queues:default"}}, } @@ -2273,7 +2270,6 @@ func TestPauseError(t *testing.T) { err := r.Pause(tc.qname) if err == nil { t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname) - continue } got, err := r.client.SMembers(base.PausedQueues).Result() @@ -2292,9 +2288,9 @@ func TestUnpause(t *testing.T) { r := setup(t) tests := []struct { - initial []string // initial queue keys in the set - qname string // queue name to unpause - want []string // expected queue keys in the set + initial []string // initial keys in the paused set + qname string // name of the queue to unpause + want []string // expected keys in the paused set }{ {[]string{"asynq:queues:default"}, "default", []string{}}, {[]string{"asynq:queues:default", "asynq:queues:low"}, "low", []string{"asynq:queues:default"}}, @@ -2313,7 +2309,6 @@ func TestUnpause(t *testing.T) { err := r.Unpause(tc.qname) if err != nil { t.Errorf("Unpause(%q) returned error: %v", tc.qname, err) - continue } got, err := r.client.SMembers(base.PausedQueues).Result() @@ -2333,9 +2328,9 @@ func TestUnpauseError(t *testing.T) { tests := []struct { desc string // test case description - initial []string // initial queue keys in the set - qname string // queue name to unpause - want []string // expected queue keys in the set + initial []string // initial keys in the paused set + qname string // name of the queue to unpause + want []string // expected keys in the paused set }{ {"set is empty", []string{}, "default", []string{}}, {"queue is not in the set", []string{"asynq:queues:default"}, "low", []string{"asynq:queues:default"}}, @@ -2354,7 +2349,6 @@ func TestUnpauseError(t *testing.T) { err := r.Unpause(tc.qname) if err == nil { t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname) - continue } got, err := r.client.SMembers(base.PausedQueues).Result() diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ec9593d..e3a493e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -106,11 +106,11 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { - var keys []string + var qkeys []interface{} for _, q := range qnames { - keys = append(keys, base.QueueKey(q)) + qkeys = append(qkeys, base.QueueKey(q)) } - data, err := r.dequeue(keys...) + data, err := r.dequeue(qkeys...) if err == redis.Nil { return nil, ErrNoProcessableTask } @@ -142,13 +142,9 @@ for _, qkey in ipairs(ARGV) do end return nil`) -func (r *RDB) dequeue(queues ...string) (data string, err error) { - var args []interface{} - for _, qkey := range queues { - args = append(args, qkey) - } +func (r *RDB) dequeue(qkeys ...interface{}) (data string, err error) { res, err := dequeueCmd.Run(r.client, - []string{base.InProgressQueue, base.PausedQueues}, args...).Result() + []string{base.InProgressQueue, base.PausedQueues}, qkeys...).Result() if err != nil { return "", err } @@ -163,7 +159,10 @@ func (r *RDB) dequeue(queues ...string) (data string, err error) { // ARGV[3] -> task ID // Note: LREM count ZERO means "remove all elements equal to val" var doneCmd = redis.NewScript(` -redis.call("LREM", KEYS[1], 0, ARGV[1]) +local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) +if x == 0 then + return redis.error_reply("NOT FOUND") +end local n = redis.call("INCR", KEYS[2]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[2], ARGV[2]) @@ -285,7 +284,10 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp var retryCmd = redis.NewScript(` -redis.call("LREM", KEYS[1], 0, ARGV[1]) +local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) +if x == 0 then + return redis.error_reply("NOT FOUND") +end redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) local n = redis.call("INCR", KEYS[3]) if tonumber(n) == 1 then @@ -336,7 +338,10 @@ const ( // ARGV[5] -> max number of tasks in dead queue (e.g., 100) // ARGV[6] -> stats expiration timestamp var killCmd = redis.NewScript(` -redis.call("LREM", KEYS[1], 0, ARGV[1]) +local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) +if x == 0 then + return redis.error_reply("NOT FOUND") +end redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) @@ -400,21 +405,17 @@ func (r *RDB) RequeueAll() (int64, error) { return n, nil } -// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that -// have to be processed. -// -// qnames specifies to which queues to send tasks. -func (r *RDB) CheckAndEnqueue(qnames ...string) error { +// CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that +// are ready to be processed. +func (r *RDB) CheckAndEnqueue() (err error) { delayed := []string{base.ScheduledQueue, base.RetryQueue} for _, zset := range delayed { - var err error - if len(qnames) == 1 { - err = r.forwardSingle(zset, base.QueueKey(qnames[0])) - } else { - err = r.forward(zset) - } - if err != nil { - return err + n := 1 + for n != 0 { + n, err = r.forward(zset) + if err != nil { + return err + } } } return nil @@ -423,40 +424,27 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error { // KEYS[1] -> source queue (e.g. scheduled or retry queue) // ARGV[1] -> current unix time // ARGV[2] -> queue prefix +// Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) for _, msg in ipairs(msgs) do local decoded = cjson.decode(msg) local qkey = ARGV[2] .. decoded["Queue"] redis.call("LPUSH", qkey, msg) redis.call("ZREM", KEYS[1], msg) end -return msgs`) +return table.getn(msgs)`) -// forward moves all tasks with a score less than the current unix time -// from the src zset. -func (r *RDB) forward(src string) error { +// forward moves tasks with a score less than the current unix time +// from the src zset. It returns the number of tasks moved. +func (r *RDB) forward(src string) (int, error) { now := float64(time.Now().Unix()) - return forwardCmd.Run(r.client, - []string{src}, now, base.QueuePrefix).Err() -} - -// KEYS[1] -> source queue (e.g. scheduled or retry queue) -// KEYS[2] -> destination queue -var forwardSingleCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) -for _, msg in ipairs(msgs) do - redis.call("LPUSH", KEYS[2], msg) - redis.call("ZREM", KEYS[1], msg) -end -return msgs`) - -// forwardSingle moves all tasks with a score less than the current unix time -// from the src zset to dst list. -func (r *RDB) forwardSingle(src, dst string) error { - now := float64(time.Now().Unix()) - return forwardSingleCmd.Run(r.client, - []string{src, dst}, now).Err() + res, err := forwardCmd.Run(r.client, + []string{src}, now, base.QueuePrefix).Result() + if err != nil { + return 0, err + } + return cast.ToInt(res), nil } // KEYS[1] -> asynq:servers: diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 27aebe3..417dd17 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -860,7 +860,6 @@ func TestCheckAndEnqueue(t *testing.T) { tests := []struct { scheduled []h.ZSetEntry retry []h.ZSetEntry - qnames []string wantEnqueued map[string][]*base.TaskMessage wantScheduled []*base.TaskMessage wantRetry []*base.TaskMessage @@ -872,7 +871,6 @@ func TestCheckAndEnqueue(t *testing.T) { }, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(secondAgo.Unix())}}, - qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, @@ -885,7 +883,6 @@ func TestCheckAndEnqueue(t *testing.T) { {Msg: t2, Score: float64(secondAgo.Unix())}}, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(secondAgo.Unix())}}, - qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, @@ -898,7 +895,6 @@ func TestCheckAndEnqueue(t *testing.T) { {Msg: t2, Score: float64(hourFromNow.Unix())}}, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(hourFromNow.Unix())}}, - qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, @@ -912,7 +908,6 @@ func TestCheckAndEnqueue(t *testing.T) { }, retry: []h.ZSetEntry{ {Msg: t5, Score: float64(secondAgo.Unix())}}, - qnames: []string{"default", "critical", "low"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t4}, @@ -928,7 +923,7 @@ func TestCheckAndEnqueue(t *testing.T) { h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedRetryQueue(t, r.client, tc.retry) - err := r.CheckAndEnqueue(tc.qnames...) + err := r.CheckAndEnqueue() if err != nil { t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) continue diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 8bcba61..9044e17 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -132,7 +132,7 @@ func (tb *TestBroker) RequeueAll() (int64, error) { return tb.real.RequeueAll() } -func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { +func (tb *TestBroker) CheckAndEnqueue() error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { diff --git a/processor.go b/processor.go index 8b02551..f2bb1af 100644 --- a/processor.go +++ b/processor.go @@ -253,7 +253,7 @@ func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { - errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) + errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q", msg.ID, msg.Type, base.InProgressQueue) p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { diff --git a/scheduler.go b/scheduler.go index 21dea52..16fd5cc 100644 --- a/scheduler.go +++ b/scheduler.go @@ -21,29 +21,20 @@ type scheduler struct { // poll interval on average avgInterval time.Duration - - // list of queues to move the tasks into. - qnames []string } type schedulerParams struct { logger *log.Logger broker base.Broker interval time.Duration - queues map[string]int } func newScheduler(params schedulerParams) *scheduler { - var qnames []string - for q := range params.queues { - qnames = append(qnames, q) - } return &scheduler{ logger: params.logger, broker: params.broker, done: make(chan struct{}), avgInterval: params.interval, - qnames: qnames, } } @@ -71,7 +62,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) { } func (s *scheduler) exec() { - if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil { + if err := s.broker.CheckAndEnqueue(); err != nil { s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } diff --git a/scheduler_test.go b/scheduler_test.go index ee82250..34cd9fb 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -23,7 +23,6 @@ func TestScheduler(t *testing.T) { logger: testLogger, broker: rdbClient, interval: pollInterval, - queues: defaultQueueConfig, }) t1 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("send_email", nil) diff --git a/server.go b/server.go index 06ca84a..896c03c 100644 --- a/server.go +++ b/server.go @@ -283,8 +283,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger.SetLevel(toInternalLogLevel(loglevel)) rdb := rdb.NewRDB(createRedisClient(r)) - starting := make(chan *base.TaskMessage, n) - finished := make(chan *base.TaskMessage, n) + starting := make(chan *base.TaskMessage) + finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest) status := base.NewServerStatus(base.StatusIdle) cancels := base.NewCancelations() @@ -309,7 +309,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger: logger, broker: rdb, interval: 5 * time.Second, - queues: queues, }) subscriber := newSubscriber(subscriberParams{ logger: logger,