mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-25 10:56:12 +08:00 
			
		
		
		
	Define RDB.GetTaskInfo
This commit is contained in:
		| @@ -292,6 +292,103 @@ func reverse(x []string) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // checkQueueExists verifies whether the queue exists. | ||||
| // It returns QueueNotFoundError if queue doesn't exist. | ||||
| func (r *RDB) checkQueueExists(qname string) error { | ||||
| 	exists, err := r.client.SIsMember(base.AllQueues, qname).Result() | ||||
| 	if err != nil { | ||||
| 		return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) | ||||
| 	} | ||||
| 	if !exists { | ||||
| 		return errors.E(errors.Internal, &errors.QueueNotFoundError{Queue: qname}) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Input: | ||||
| // KEYS[1] -> task key (asynq:{<qname>}:t:<taskid>) | ||||
| // ARGV[1] -> task id | ||||
| // ARGV[2] -> current time in Unix time (seconds) | ||||
| // ARGV[3] -> queue key prefix (asynq:{<qname>}:) | ||||
| // | ||||
| // Output: | ||||
| // Tuple of {msg, state, nextProcessAt} | ||||
| // msg: encoded task message | ||||
| // state: string describing the state of the task | ||||
| // nextProcessAt: unix time in seconds, zero if not applicable. | ||||
| // | ||||
| // If the task key doesn't exist, it returns error with a message "NOT FOUND" | ||||
| var getTaskInfoCmd = redis.NewScript(` | ||||
| 	if redis.call("EXISTS", KEYS[1]) == 0 then | ||||
| 		return redis.error_reply("NOT FOUND") | ||||
| 	end | ||||
| 	local msg, state = unpack(redis.call("HMGET", KEYS[1], "msg", "state")) | ||||
| 	if state == "scheduled" or state == "retry" then | ||||
| 		return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1])} | ||||
| 	end | ||||
| 	if state == "pending" then | ||||
| 		return {msg, state, ARGV[2]} | ||||
| 	end | ||||
| 	return {msg, state, 0} | ||||
| `) | ||||
|  | ||||
| // GetTaskInfo returns a TaskInfo describing the task from the given queue. | ||||
| func (r *RDB) GetTaskInfo(qname string, id uuid.UUID) (*base.TaskInfo, error) { | ||||
| 	var op errors.Op = "rdb.GetTaskInfo" | ||||
| 	if err := r.checkQueueExists(qname); err != nil { | ||||
| 		return nil, errors.E(op, errors.CanonicalCode(err), err) | ||||
| 	} | ||||
| 	keys := []string{base.TaskKey(qname, id.String())} | ||||
| 	argv := []interface{}{ | ||||
| 		id.String(), | ||||
| 		time.Now().Unix(), | ||||
| 		base.QueueKeyPrefix(qname), | ||||
| 	} | ||||
| 	res, err := getTaskInfoCmd.Run(r.client, keys, argv...).Result() | ||||
| 	if err != nil { | ||||
| 		if err.Error() == "NOT FOUND" { | ||||
| 			return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) | ||||
| 		} | ||||
| 		return nil, errors.E(op, errors.Unknown, err) | ||||
| 	} | ||||
| 	vals, err := cast.ToSliceE(res) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") | ||||
| 	} | ||||
| 	if len(vals) != 3 { | ||||
| 		return nil, errors.E(op, errors.Internal, "unepxected number of values returned from Lua script") | ||||
| 	} | ||||
| 	encoded, err := cast.ToStringE(vals[0]) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") | ||||
| 	} | ||||
| 	stateStr, err := cast.ToStringE(vals[1]) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") | ||||
| 	} | ||||
| 	processAtUnix, err := cast.ToInt64E(vals[2]) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") | ||||
| 	} | ||||
| 	msg, err := base.DecodeMessage([]byte(encoded)) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.Internal, "could not decode task message") | ||||
| 	} | ||||
| 	state, err := base.TaskStateFromString(stateStr) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.E(op, errors.CanonicalCode(err), err) | ||||
| 	} | ||||
| 	var nextProcessAt time.Time | ||||
| 	if processAtUnix != 0 { | ||||
| 		nextProcessAt = time.Unix(processAtUnix, 0) | ||||
| 	} | ||||
| 	return &base.TaskInfo{ | ||||
| 		Message:       msg, | ||||
| 		State:         state, | ||||
| 		NextProcessAt: nextProcessAt, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // Pagination specifies the page size and page number | ||||
| // for the list operation. | ||||
| type Pagination struct { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user