From cb2ebf18ac094ba51a8a04da7e02f6512e4dc961 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 14 Jan 2020 07:26:41 -0800 Subject: [PATCH] [performance] Skip the overhead of json decoding when scheduling to one queue --- background.go | 6 ++++-- internal/rdb/rdb.go | 34 +++++++++++++++++++++++++++++----- internal/rdb/rdb_test.go | 7 ++++++- scheduler.go | 12 ++++++++++-- scheduler_test.go | 2 +- 5 files changed, 50 insertions(+), 11 deletions(-) diff --git a/background.go b/background.go index 6c10406..4f30d1d 100644 --- a/background.go +++ b/background.go @@ -108,9 +108,11 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { if queues == nil || len(queues) == 0 { queues = defaultQueueConfig } + qcfg := normalizeQueueCfg(queues) + rdb := rdb.NewRDB(r) - scheduler := newScheduler(rdb, 5*time.Second) - processor := newProcessor(rdb, n, normalizeQueueCfg(queues), cfg.StrictPriority, delayFunc) + scheduler := newScheduler(rdb, 5*time.Second, qcfg) + processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc) return &Background{ rdb: rdb, scheduler: scheduler, diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index a35adcf..80c16eb 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -292,10 +292,18 @@ func (r *RDB) RestoreUnfinished() (int64, error) { // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // have to be processed. -func (r *RDB) CheckAndEnqueue() error { +// +// qnames specifies to which queues to send tasks. +func (r *RDB) CheckAndEnqueue(qnames ...string) error { delayed := []string{base.ScheduledQueue, base.RetryQueue} for _, zset := range delayed { - if err := r.forward(zset); err != nil { + var err error + if len(qnames) == 1 { + err = r.forwardSingle(zset, base.QueueKey(qnames[0])) + } else { + err = r.forward(zset) + } + if err != nil { return err } } @@ -303,8 +311,8 @@ func (r *RDB) CheckAndEnqueue() error { } // forward moves all tasks with a score less than the current unix time -// from the given zset to the default queue. -func (r *RDB) forward(from string) error { +// from the src zset. +func (r *RDB) forward(src string) error { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, msg in ipairs(msgs) do @@ -317,5 +325,21 @@ func (r *RDB) forward(from string) error { `) now := float64(time.Now().Unix()) return script.Run(r.client, - []string{from, base.DefaultQueue}, now, base.QueuePrefix).Err() + []string{src}, now, base.QueuePrefix).Err() +} + +// forwardSingle moves all tasks with a score less than the current unix time +// from the src zset to dst list. +func (r *RDB) forwardSingle(src, dst string) error { + script := redis.NewScript(` + local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) + for _, msg in ipairs(msgs) do + redis.call("ZREM", KEYS[1], msg) + redis.call("LPUSH", KEYS[2], msg) + end + return msgs + `) + now := float64(time.Now().Unix()) + return script.Run(r.client, + []string{src, dst}, now).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7ed0284..332f86c 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -587,6 +587,7 @@ func TestCheckAndEnqueue(t *testing.T) { tests := []struct { scheduled []h.ZSetEntry retry []h.ZSetEntry + qnames []string wantEnqueued map[string][]*base.TaskMessage wantScheduled []*base.TaskMessage wantRetry []*base.TaskMessage @@ -598,6 +599,7 @@ func TestCheckAndEnqueue(t *testing.T) { }, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(secondAgo.Unix())}}, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, @@ -610,6 +612,7 @@ func TestCheckAndEnqueue(t *testing.T) { {Msg: t2, Score: float64(secondAgo.Unix())}}, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(secondAgo.Unix())}}, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, @@ -622,6 +625,7 @@ func TestCheckAndEnqueue(t *testing.T) { {Msg: t2, Score: float64(hourFromNow.Unix())}}, retry: []h.ZSetEntry{ {Msg: t3, Score: float64(hourFromNow.Unix())}}, + qnames: []string{"default"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, }, @@ -635,6 +639,7 @@ func TestCheckAndEnqueue(t *testing.T) { }, retry: []h.ZSetEntry{ {Msg: t5, Score: float64(secondAgo.Unix())}}, + qnames: []string{"default", "critical", "low"}, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t4}, @@ -650,7 +655,7 @@ func TestCheckAndEnqueue(t *testing.T) { h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedRetryQueue(t, r.client, tc.retry) - err := r.CheckAndEnqueue() + err := r.CheckAndEnqueue(tc.qnames...) if err != nil { t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) continue diff --git a/scheduler.go b/scheduler.go index 36d4cbf..d452637 100644 --- a/scheduler.go +++ b/scheduler.go @@ -19,13 +19,21 @@ type scheduler struct { // poll interval on average avgInterval time.Duration + + // list of queues to move the tasks into. + qnames []string } -func newScheduler(r *rdb.RDB, avgInterval time.Duration) *scheduler { +func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler { + var qnames []string + for q := range qcfg { + qnames = append(qnames, q) + } return &scheduler{ rdb: r, done: make(chan struct{}), avgInterval: avgInterval, + qnames: qnames, } } @@ -51,7 +59,7 @@ func (s *scheduler) start() { } func (s *scheduler) exec() { - if err := s.rdb.CheckAndEnqueue(); err != nil { + if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) } } diff --git a/scheduler_test.go b/scheduler_test.go index 45dd33f..b16ee04 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -18,7 +18,7 @@ func TestScheduler(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - s := newScheduler(rdbClient, pollInterval) + s := newScheduler(rdbClient, pollInterval, defaultQueueConfig) t1 := h.NewTaskMessage("gen_thumbnail", nil) t2 := h.NewTaskMessage("send_email", nil) t3 := h.NewTaskMessage("reindex", nil)