mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-24 22:46:11 +08:00 
			
		
		
		
	Fix dequeue Lua script to use a single hash tag
This commit is contained in:
		| @@ -32,11 +32,11 @@ const statsTTL = 90 * 24 * time.Hour // 90 days | ||||
|  | ||||
| // RDB is a client interface to query and mutate task queues. | ||||
| type RDB struct { | ||||
| 	client *redis.Client | ||||
| 	client redis.UniversalClient | ||||
| } | ||||
|  | ||||
| // NewRDB returns a new instance of RDB. | ||||
| func NewRDB(client *redis.Client) *RDB { | ||||
| func NewRDB(client redis.UniversalClient) *RDB { | ||||
| 	return &RDB{client} | ||||
| } | ||||
|  | ||||
| @@ -108,14 +108,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { | ||||
| // Dequeue skips a queue if the queue is paused. | ||||
| // If all queues are empty, ErrNoProcessableTask error is returned. | ||||
| func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { | ||||
| 	var qkeys []interface{} | ||||
| 	for _, q := range qnames { | ||||
| 		qkeys = append(qkeys, base.QueueKey(q)) | ||||
| 	} | ||||
| 	data, d, err := r.dequeue(qkeys...) | ||||
| 	if err == redis.Nil { | ||||
| 		return nil, time.Time{}, ErrNoProcessableTask | ||||
| 	} | ||||
| 	data, d, err := r.dequeue(qnames...) | ||||
| 	if err != nil { | ||||
| 		return nil, time.Time{}, err | ||||
| 	} | ||||
| @@ -125,64 +118,69 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti | ||||
| 	return msg, time.Unix(d, 0), nil | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> asynq:{<qname>} | ||||
| // KEYS[2] -> asynq:{<qname>}:paused | ||||
| // KEYS[3] -> asynq:{<qname>}:in_progress | ||||
| // KEYS[4] -> asynq:{<qname>}:deadlines | ||||
| // ARGV[1]  -> current time in Unix time | ||||
| // ARGV[2:] -> List of queues to query in order | ||||
| // | ||||
| // dequeueCmd checks whether a queue is paused first, before | ||||
| // calling RPOPLPUSH to pop a task from the queue. | ||||
| // It computes the task deadline by inspecting Timout and Deadline fields, | ||||
| // and inserts the task with deadlines set. | ||||
| var dequeueCmd = redis.NewScript(` | ||||
| for i = 2, table.getn(ARGV) do | ||||
| 	local qkey = ARGV[i] | ||||
| 	local key_paused = qkey .. ":paused" | ||||
| 	local key_inprogress = qkey .. ":in_progress" | ||||
| 	local key_deadlines = qkey .. ":deadlines" | ||||
| 	if redis.call("EXISTS", key_paused) == 0 then | ||||
| 		local msg = redis.call("RPOPLPUSH", qkey, key_inprogress) | ||||
| 		if msg then | ||||
| 			local decoded = cjson.decode(msg) | ||||
| 			local timeout = decoded["Timeout"] | ||||
| 			local deadline = decoded["Deadline"] | ||||
| 			local score | ||||
| 			if timeout ~= 0 and deadline ~= 0 then | ||||
| 				score = math.min(ARGV[1]+timeout, deadline) | ||||
| 			elseif timeout ~= 0 then | ||||
| 				score = ARGV[1] + timeout | ||||
| 			elseif deadline ~= 0 then | ||||
| 				score = deadline | ||||
| 			else | ||||
| 				return redis.error_reply("asynq internal error: both timeout and deadline are not set") | ||||
| 			end | ||||
| 			redis.call("ZADD", key_deadlines, score, msg) | ||||
| 			return {msg, score} | ||||
| if redis.call("EXISTS", KEYS[2]) == 0 then | ||||
| 	local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) | ||||
| 	if msg then | ||||
| 		local decoded = cjson.decode(msg) | ||||
| 		local timeout = decoded["Timeout"] | ||||
| 		local deadline = decoded["Deadline"] | ||||
| 		local score | ||||
| 		if timeout ~= 0 and deadline ~= 0 then | ||||
| 			score = math.min(ARGV[1]+timeout, deadline) | ||||
| 		elseif timeout ~= 0 then | ||||
| 			score = ARGV[1] + timeout | ||||
| 		elseif deadline ~= 0 then | ||||
| 			score = deadline | ||||
| 		else | ||||
| 			return redis.error_reply("asynq internal error: both timeout and deadline are not set") | ||||
| 		end | ||||
| 		redis.call("ZADD", KEYS[4], score, msg) | ||||
| 		return {msg, score} | ||||
| 	end | ||||
| end | ||||
| return nil`) | ||||
|  | ||||
| func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err error) { | ||||
| 	var args []interface{} | ||||
| 	args = append(args, time.Now().Unix()) | ||||
| 	args = append(args, qkeys...) | ||||
| 	res, err := dequeueCmd.Run(r.client, nil, args...).Result() | ||||
| 	if err != nil { | ||||
| 		return "", 0, err | ||||
| func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) { | ||||
| 	for _, qname := range qnames { | ||||
| 		keys := []string{ | ||||
| 			base.QueueKey(qname), | ||||
| 			base.PausedKey(qname), | ||||
| 			base.InProgressKey(qname), | ||||
| 			base.DeadlinesKey(qname), | ||||
| 		} | ||||
| 		res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result() | ||||
| 		if err == redis.Nil { | ||||
| 			continue | ||||
| 		} else if err != nil { | ||||
| 			return "", 0, err | ||||
| 		} | ||||
| 		data, err := cast.ToSliceE(res) | ||||
| 		if err != nil { | ||||
| 			return "", 0, err | ||||
| 		} | ||||
| 		if len(data) != 2 { | ||||
| 			return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) | ||||
| 		} | ||||
| 		if msgjson, err = cast.ToStringE(data[0]); err != nil { | ||||
| 			return "", 0, err | ||||
| 		} | ||||
| 		if deadline, err = cast.ToInt64E(data[1]); err != nil { | ||||
| 			return "", 0, err | ||||
| 		} | ||||
| 		return msgjson, deadline, nil | ||||
| 	} | ||||
| 	data, err := cast.ToSliceE(res) | ||||
| 	if err != nil { | ||||
| 		return "", 0, err | ||||
| 	} | ||||
| 	if len(data) != 2 { | ||||
| 		return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) | ||||
| 	} | ||||
| 	if msgjson, err = cast.ToStringE(data[0]); err != nil { | ||||
| 		return "", 0, err | ||||
| 	} | ||||
| 	if deadline, err = cast.ToInt64E(data[1]); err != nil { | ||||
| 		return "", 0, err | ||||
| 	} | ||||
| 	return msgjson, deadline, nil | ||||
| 	return "", 0, ErrNoProcessableTask | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> asynq:{<qname>}:in_progress | ||||
|   | ||||
		Reference in New Issue
	
	Block a user