From 6de80f9f0482bf3efc286b07a350a00fe7c30976 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 1 Jan 2020 14:57:31 -0800 Subject: [PATCH] Change dequeue method to use lua script --- internal/rdb/rdb.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 3de4f0a..76db27a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -9,6 +9,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" + "github.com/spf13/cast" ) var ( @@ -46,20 +47,16 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return r.client.LPush(qname, string(bytes)).Err() } -// Dequeue blocks until there is a task available to be processed, -// once a task is available, it adds the task to "in progress" queue -// and returns the task. If there are no tasks for the entire timeout -// duration, it returns ErrDequeueTimeout. +// Dequeue queries the queues in the given order and returns a task message if +// one is found. If all queues are empty it returns ErrNoProcessableTask error. func (r *RDB) Dequeue(queues ...string) (*base.TaskMessage, error) { data, err := r.dequeue(queues...) if err == redis.Nil { - // all queues are empty // TODO(hibiken): Rename this sentinel error return nil, ErrNoProcessableTask } if err != nil { return nil, err } - var msg base.TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { @@ -69,16 +66,25 @@ func (r *RDB) Dequeue(queues ...string) (*base.TaskMessage, error) { } func (r *RDB) dequeue(queues ...string) (data string, err error) { + var args []interface{} for _, qname := range queues { - data, err = r.client.RPopLPush(qname, base.InProgressQueue).Result() - if err == nil { - return data, nil - } - if err != redis.Nil { - return "", err - } + args = append(args, qname) } - return data, err + script := redis.NewScript(` + local res + for _, qname in ipairs(ARGV) do + res = redis.call("RPOPLPUSH", qname, KEYS[1]) + if res then + return res + end + end + return res + `) + res, err := script.Run(r.client, []string{base.InProgressQueue}, args...).Result() + if err != nil { + return "", err + } + return cast.ToStringE(res) } // Done removes the task from in-progress queue to mark the task as done.