diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c78d58d..b792c49 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -32,11 +32,11 @@ const statsTTL = 90 * 24 * time.Hour // 90 days // RDB is a client interface to query and mutate task queues. type RDB struct { - client *redis.Client + client redis.UniversalClient } // NewRDB returns a new instance of RDB. -func NewRDB(client *redis.Client) *RDB { +func NewRDB(client redis.UniversalClient) *RDB { return &RDB{client} } @@ -108,14 +108,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { - var qkeys []interface{} - for _, q := range qnames { - qkeys = append(qkeys, base.QueueKey(q)) - } - data, d, err := r.dequeue(qkeys...) - if err == redis.Nil { - return nil, time.Time{}, ErrNoProcessableTask - } + data, d, err := r.dequeue(qnames...) if err != nil { return nil, time.Time{}, err } @@ -125,64 +118,69 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti return msg, time.Unix(d, 0), nil } +// KEYS[1] -> asynq:{} +// KEYS[2] -> asynq:{}:paused +// KEYS[3] -> asynq:{}:in_progress +// KEYS[4] -> asynq:{}:deadlines // ARGV[1] -> current time in Unix time -// ARGV[2:] -> List of queues to query in order // // dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. // It computes the task deadline by inspecting Timout and Deadline fields, // and inserts the task with deadlines set. var dequeueCmd = redis.NewScript(` -for i = 2, table.getn(ARGV) do - local qkey = ARGV[i] - local key_paused = qkey .. ":paused" - local key_inprogress = qkey .. ":in_progress" - local key_deadlines = qkey .. ":deadlines" - if redis.call("EXISTS", key_paused) == 0 then - local msg = redis.call("RPOPLPUSH", qkey, key_inprogress) - if msg then - local decoded = cjson.decode(msg) - local timeout = decoded["Timeout"] - local deadline = decoded["Deadline"] - local score - if timeout ~= 0 and deadline ~= 0 then - score = math.min(ARGV[1]+timeout, deadline) - elseif timeout ~= 0 then - score = ARGV[1] + timeout - elseif deadline ~= 0 then - score = deadline - else - return redis.error_reply("asynq internal error: both timeout and deadline are not set") - end - redis.call("ZADD", key_deadlines, score, msg) - return {msg, score} +if redis.call("EXISTS", KEYS[2]) == 0 then + local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) + if msg then + local decoded = cjson.decode(msg) + local timeout = decoded["Timeout"] + local deadline = decoded["Deadline"] + local score + if timeout ~= 0 and deadline ~= 0 then + score = math.min(ARGV[1]+timeout, deadline) + elseif timeout ~= 0 then + score = ARGV[1] + timeout + elseif deadline ~= 0 then + score = deadline + else + return redis.error_reply("asynq internal error: both timeout and deadline are not set") end + redis.call("ZADD", KEYS[4], score, msg) + return {msg, score} end end return nil`) -func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err error) { - var args []interface{} - args = append(args, time.Now().Unix()) - args = append(args, qkeys...) - res, err := dequeueCmd.Run(r.client, nil, args...).Result() - if err != nil { - return "", 0, err +func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) { + for _, qname := range qnames { + keys := []string{ + base.QueueKey(qname), + base.PausedKey(qname), + base.InProgressKey(qname), + base.DeadlinesKey(qname), + } + res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result() + if err == redis.Nil { + continue + } else if err != nil { + return "", 0, err + } + data, err := cast.ToSliceE(res) + if err != nil { + return "", 0, err + } + if len(data) != 2 { + return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) + } + if msgjson, err = cast.ToStringE(data[0]); err != nil { + return "", 0, err + } + if deadline, err = cast.ToInt64E(data[1]); err != nil { + return "", 0, err + } + return msgjson, deadline, nil } - data, err := cast.ToSliceE(res) - if err != nil { - return "", 0, err - } - if len(data) != 2 { - return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) - } - if msgjson, err = cast.ToStringE(data[0]); err != nil { - return "", 0, err - } - if deadline, err = cast.ToInt64E(data[1]); err != nil { - return "", 0, err - } - return msgjson, deadline, nil + return "", 0, ErrNoProcessableTask } // KEYS[1] -> asynq:{}:in_progress