2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

[performance] Skip the overhead of json decoding when scheduling to one

queue
This commit is contained in:
Ken Hibino 2020-01-14 07:26:41 -08:00
parent 5a6f737589
commit cb2ebf18ac
5 changed files with 50 additions and 11 deletions

View File

@ -108,9 +108,11 @@ func NewBackground(r *redis.Client, cfg *Config) *Background {
if queues == nil || len(queues) == 0 { if queues == nil || len(queues) == 0 {
queues = defaultQueueConfig queues = defaultQueueConfig
} }
qcfg := normalizeQueueCfg(queues)
rdb := rdb.NewRDB(r) rdb := rdb.NewRDB(r)
scheduler := newScheduler(rdb, 5*time.Second) scheduler := newScheduler(rdb, 5*time.Second, qcfg)
processor := newProcessor(rdb, n, normalizeQueueCfg(queues), cfg.StrictPriority, delayFunc) processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc)
return &Background{ return &Background{
rdb: rdb, rdb: rdb,
scheduler: scheduler, scheduler: scheduler,

View File

@ -292,10 +292,18 @@ func (r *RDB) RestoreUnfinished() (int64, error) {
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
// have to be processed. // have to be processed.
func (r *RDB) CheckAndEnqueue() error { //
// qnames specifies to which queues to send tasks.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
delayed := []string{base.ScheduledQueue, base.RetryQueue} delayed := []string{base.ScheduledQueue, base.RetryQueue}
for _, zset := range delayed { for _, zset := range delayed {
if err := r.forward(zset); err != nil { var err error
if len(qnames) == 1 {
err = r.forwardSingle(zset, base.QueueKey(qnames[0]))
} else {
err = r.forward(zset)
}
if err != nil {
return err return err
} }
} }
@ -303,8 +311,8 @@ func (r *RDB) CheckAndEnqueue() error {
} }
// forward moves all tasks with a score less than the current unix time // forward moves all tasks with a score less than the current unix time
// from the given zset to the default queue. // from the src zset.
func (r *RDB) forward(from string) error { func (r *RDB) forward(src string) error {
script := redis.NewScript(` script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
@ -317,5 +325,21 @@ func (r *RDB) forward(from string) error {
`) `)
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
return script.Run(r.client, return script.Run(r.client,
[]string{from, base.DefaultQueue}, now, base.QueuePrefix).Err() []string{src}, now, base.QueuePrefix).Err()
}
// forwardSingle moves all tasks with a score less than the current unix time
// from the src zset to dst list.
func (r *RDB) forwardSingle(src, dst string) error {
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
redis.call("LPUSH", KEYS[2], msg)
end
return msgs
`)
now := float64(time.Now().Unix())
return script.Run(r.client,
[]string{src, dst}, now).Err()
} }

View File

@ -587,6 +587,7 @@ func TestCheckAndEnqueue(t *testing.T) {
tests := []struct { tests := []struct {
scheduled []h.ZSetEntry scheduled []h.ZSetEntry
retry []h.ZSetEntry retry []h.ZSetEntry
qnames []string
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
@ -598,6 +599,7 @@ func TestCheckAndEnqueue(t *testing.T) {
}, },
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
@ -610,6 +612,7 @@ func TestCheckAndEnqueue(t *testing.T) {
{Msg: t2, Score: float64(secondAgo.Unix())}}, {Msg: t2, Score: float64(secondAgo.Unix())}},
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3}, "default": {t2, t3},
}, },
@ -622,6 +625,7 @@ func TestCheckAndEnqueue(t *testing.T) {
{Msg: t2, Score: float64(hourFromNow.Unix())}}, {Msg: t2, Score: float64(hourFromNow.Unix())}},
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(hourFromNow.Unix())}}, {Msg: t3, Score: float64(hourFromNow.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
@ -635,6 +639,7 @@ func TestCheckAndEnqueue(t *testing.T) {
}, },
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t5, Score: float64(secondAgo.Unix())}}, {Msg: t5, Score: float64(secondAgo.Unix())}},
qnames: []string{"default", "critical", "low"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t4}, "critical": {t4},
@ -650,7 +655,7 @@ func TestCheckAndEnqueue(t *testing.T) {
h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedScheduledQueue(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry) h.SeedRetryQueue(t, r.client, tc.retry)
err := r.CheckAndEnqueue() err := r.CheckAndEnqueue(tc.qnames...)
if err != nil { if err != nil {
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
continue continue

View File

@ -19,13 +19,21 @@ type scheduler struct {
// poll interval on average // poll interval on average
avgInterval time.Duration avgInterval time.Duration
// list of queues to move the tasks into.
qnames []string
} }
func newScheduler(r *rdb.RDB, avgInterval time.Duration) *scheduler { func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]uint) *scheduler {
var qnames []string
for q := range qcfg {
qnames = append(qnames, q)
}
return &scheduler{ return &scheduler{
rdb: r, rdb: r,
done: make(chan struct{}), done: make(chan struct{}),
avgInterval: avgInterval, avgInterval: avgInterval,
qnames: qnames,
} }
} }
@ -51,7 +59,7 @@ func (s *scheduler) start() {
} }
func (s *scheduler) exec() { func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(); err != nil { if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
} }
} }

View File

@ -18,7 +18,7 @@ func TestScheduler(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second const pollInterval = time.Second
s := newScheduler(rdbClient, pollInterval) s := newScheduler(rdbClient, pollInterval, defaultQueueConfig)
t1 := h.NewTaskMessage("gen_thumbnail", nil) t1 := h.NewTaskMessage("gen_thumbnail", nil)
t2 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("send_email", nil)
t3 := h.NewTaskMessage("reindex", nil) t3 := h.NewTaskMessage("reindex", nil)