mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Define RDB.GetTaskInfo
This commit is contained in:
parent
b358de907e
commit
8922d2423a
@ -14,6 +14,7 @@ import (
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
pb "github.com/hibiken/asynq/internal/proto"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
@ -63,6 +64,22 @@ func (s TaskState) String() string {
|
||||
panic(fmt.Sprintf("internal error: unknown task state %d", s))
|
||||
}
|
||||
|
||||
func TaskStateFromString(s string) (TaskState, error) {
|
||||
switch s {
|
||||
case "active":
|
||||
return TaskStateActive, nil
|
||||
case "pending":
|
||||
return TaskStatePending, nil
|
||||
case "scheduled":
|
||||
return TaskStateScheduled, nil
|
||||
case "retry":
|
||||
return TaskStateRetry, nil
|
||||
case "archived":
|
||||
return TaskStateArchived, nil
|
||||
}
|
||||
return 0, errors.E(errors.FailedPrecondition, fmt.Sprintf("%q is not supported task state", s))
|
||||
}
|
||||
|
||||
// ValidateQueueName validates a given qname to be used as a queue name.
|
||||
// Returns nil if valid, otherwise returns non-nil error.
|
||||
func ValidateQueueName(qname string) error {
|
||||
@ -249,6 +266,13 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TaskInfo describes a task message and its metadata.
|
||||
type TaskInfo struct {
|
||||
Message *TaskMessage
|
||||
State TaskState
|
||||
NextProcessAt time.Time
|
||||
}
|
||||
|
||||
// Z represents sorted set member.
|
||||
type Z struct {
|
||||
Message *TaskMessage
|
||||
|
@ -292,6 +292,103 @@ func reverse(x []string) {
|
||||
}
|
||||
}
|
||||
|
||||
// checkQueueExists verifies whether the queue exists.
|
||||
// It returns QueueNotFoundError if queue doesn't exist.
|
||||
func (r *RDB) checkQueueExists(qname string) error {
|
||||
exists, err := r.client.SIsMember(base.AllQueues, qname).Result()
|
||||
if err != nil {
|
||||
return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err})
|
||||
}
|
||||
if !exists {
|
||||
return errors.E(errors.Internal, &errors.QueueNotFoundError{Queue: qname})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Input:
|
||||
// KEYS[1] -> task key (asynq:{<qname>}:t:<taskid>)
|
||||
// ARGV[1] -> task id
|
||||
// ARGV[2] -> current time in Unix time (seconds)
|
||||
// ARGV[3] -> queue key prefix (asynq:{<qname>}:)
|
||||
//
|
||||
// Output:
|
||||
// Tuple of {msg, state, nextProcessAt}
|
||||
// msg: encoded task message
|
||||
// state: string describing the state of the task
|
||||
// nextProcessAt: unix time in seconds, zero if not applicable.
|
||||
//
|
||||
// If the task key doesn't exist, it returns error with a message "NOT FOUND"
|
||||
var getTaskInfoCmd = redis.NewScript(`
|
||||
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
local msg, state = unpack(redis.call("HMGET", KEYS[1], "msg", "state"))
|
||||
if state == "scheduled" or state == "retry" then
|
||||
return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1])}
|
||||
end
|
||||
if state == "pending" then
|
||||
return {msg, state, ARGV[2]}
|
||||
end
|
||||
return {msg, state, 0}
|
||||
`)
|
||||
|
||||
// GetTaskInfo returns a TaskInfo describing the task from the given queue.
|
||||
func (r *RDB) GetTaskInfo(qname string, id uuid.UUID) (*base.TaskInfo, error) {
|
||||
var op errors.Op = "rdb.GetTaskInfo"
|
||||
if err := r.checkQueueExists(qname); err != nil {
|
||||
return nil, errors.E(op, errors.CanonicalCode(err), err)
|
||||
}
|
||||
keys := []string{base.TaskKey(qname, id.String())}
|
||||
argv := []interface{}{
|
||||
id.String(),
|
||||
time.Now().Unix(),
|
||||
base.QueueKeyPrefix(qname),
|
||||
}
|
||||
res, err := getTaskInfoCmd.Run(r.client, keys, argv...).Result()
|
||||
if err != nil {
|
||||
if err.Error() == "NOT FOUND" {
|
||||
return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()})
|
||||
}
|
||||
return nil, errors.E(op, errors.Unknown, err)
|
||||
}
|
||||
vals, err := cast.ToSliceE(res)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
|
||||
}
|
||||
if len(vals) != 3 {
|
||||
return nil, errors.E(op, errors.Internal, "unepxected number of values returned from Lua script")
|
||||
}
|
||||
encoded, err := cast.ToStringE(vals[0])
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
|
||||
}
|
||||
stateStr, err := cast.ToStringE(vals[1])
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
|
||||
}
|
||||
processAtUnix, err := cast.ToInt64E(vals[2])
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
|
||||
}
|
||||
msg, err := base.DecodeMessage([]byte(encoded))
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.Internal, "could not decode task message")
|
||||
}
|
||||
state, err := base.TaskStateFromString(stateStr)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, errors.CanonicalCode(err), err)
|
||||
}
|
||||
var nextProcessAt time.Time
|
||||
if processAtUnix != 0 {
|
||||
nextProcessAt = time.Unix(processAtUnix, 0)
|
||||
}
|
||||
return &base.TaskInfo{
|
||||
Message: msg,
|
||||
State: state,
|
||||
NextProcessAt: nextProcessAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Pagination specifies the page size and page number
|
||||
// for the list operation.
|
||||
type Pagination struct {
|
||||
|
@ -309,6 +309,199 @@ func TestRedisInfo(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTaskInfo(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
m1 := h.NewTaskMessageWithQueue("task1", nil, "default")
|
||||
m2 := h.NewTaskMessageWithQueue("task2", nil, "default")
|
||||
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
|
||||
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
|
||||
m5 := h.NewTaskMessageWithQueue("task5", nil, "custom")
|
||||
|
||||
now := time.Now()
|
||||
fiveMinsFromNow := now.Add(5 * time.Minute)
|
||||
oneHourFromNow := now.Add(1 * time.Hour)
|
||||
twoHoursAgo := now.Add(-2 * time.Hour)
|
||||
|
||||
fixtures := struct {
|
||||
active map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
}{
|
||||
active: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"custom": {},
|
||||
},
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {m5},
|
||||
},
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: fiveMinsFromNow.Unix()}},
|
||||
"custom": {},
|
||||
},
|
||||
retry: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m3, Score: oneHourFromNow.Unix()}},
|
||||
},
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m4, Score: twoHoursAgo.Unix()}},
|
||||
},
|
||||
}
|
||||
|
||||
h.SeedAllActiveQueues(t, r.client, fixtures.active)
|
||||
h.SeedAllPendingQueues(t, r.client, fixtures.pending)
|
||||
h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, fixtures.retry)
|
||||
h.SeedAllArchivedQueues(t, r.client, fixtures.archived)
|
||||
|
||||
tests := []struct {
|
||||
qname string
|
||||
id uuid.UUID
|
||||
want *base.TaskInfo
|
||||
}{
|
||||
{
|
||||
qname: "default",
|
||||
id: m1.ID,
|
||||
want: &base.TaskInfo{
|
||||
Message: m1,
|
||||
State: base.TaskStateActive,
|
||||
NextProcessAt: time.Time{}, // zero value for N/A
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "default",
|
||||
id: m2.ID,
|
||||
want: &base.TaskInfo{
|
||||
Message: m2,
|
||||
State: base.TaskStateScheduled,
|
||||
NextProcessAt: fiveMinsFromNow,
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "custom",
|
||||
id: m3.ID,
|
||||
want: &base.TaskInfo{
|
||||
Message: m3,
|
||||
State: base.TaskStateRetry,
|
||||
NextProcessAt: oneHourFromNow,
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "custom",
|
||||
id: m4.ID,
|
||||
want: &base.TaskInfo{
|
||||
Message: m4,
|
||||
State: base.TaskStateArchived,
|
||||
NextProcessAt: time.Time{}, // zero value for N/A
|
||||
},
|
||||
},
|
||||
{
|
||||
qname: "custom",
|
||||
id: m5.ID,
|
||||
want: &base.TaskInfo{
|
||||
Message: m5,
|
||||
State: base.TaskStatePending,
|
||||
NextProcessAt: now,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
got, err := r.GetTaskInfo(tc.qname, tc.id)
|
||||
if err != nil {
|
||||
t.Errorf("GetTaskInfo(%q, %v) returned error: %v", tc.qname, tc.id, err)
|
||||
continue
|
||||
}
|
||||
if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(2*time.Second)); diff != "" {
|
||||
t.Errorf("GetTaskInfo(%q, %v) = %v, want %v; (-want,+got)\n%s",
|
||||
tc.qname, tc.id, got, tc.want, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetTaskInfoError(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
m1 := h.NewTaskMessageWithQueue("task1", nil, "default")
|
||||
m2 := h.NewTaskMessageWithQueue("task2", nil, "default")
|
||||
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
|
||||
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
|
||||
m5 := h.NewTaskMessageWithQueue("task5", nil, "custom")
|
||||
|
||||
now := time.Now()
|
||||
fiveMinsFromNow := now.Add(5 * time.Minute)
|
||||
oneHourFromNow := now.Add(1 * time.Hour)
|
||||
twoHoursAgo := now.Add(-2 * time.Hour)
|
||||
|
||||
fixtures := struct {
|
||||
active map[string][]*base.TaskMessage
|
||||
pending map[string][]*base.TaskMessage
|
||||
scheduled map[string][]base.Z
|
||||
retry map[string][]base.Z
|
||||
archived map[string][]base.Z
|
||||
}{
|
||||
active: map[string][]*base.TaskMessage{
|
||||
"default": {m1},
|
||||
"custom": {},
|
||||
},
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {m5},
|
||||
},
|
||||
scheduled: map[string][]base.Z{
|
||||
"default": {{Message: m2, Score: fiveMinsFromNow.Unix()}},
|
||||
"custom": {},
|
||||
},
|
||||
retry: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m3, Score: oneHourFromNow.Unix()}},
|
||||
},
|
||||
archived: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {{Message: m4, Score: twoHoursAgo.Unix()}},
|
||||
},
|
||||
}
|
||||
|
||||
h.SeedAllActiveQueues(t, r.client, fixtures.active)
|
||||
h.SeedAllPendingQueues(t, r.client, fixtures.pending)
|
||||
h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled)
|
||||
h.SeedAllRetryQueues(t, r.client, fixtures.retry)
|
||||
h.SeedAllArchivedQueues(t, r.client, fixtures.archived)
|
||||
|
||||
tests := []struct {
|
||||
qname string
|
||||
id uuid.UUID
|
||||
match func(err error) bool
|
||||
}{
|
||||
{
|
||||
qname: "nonexistent",
|
||||
id: m1.ID,
|
||||
match: errors.IsQueueNotFound,
|
||||
},
|
||||
{
|
||||
qname: "default",
|
||||
id: uuid.New(),
|
||||
match: errors.IsTaskNotFound,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
info, err := r.GetTaskInfo(tc.qname, tc.id)
|
||||
if info != nil {
|
||||
t.Errorf("GetTaskInfo(%q, %v) returned info: %v", tc.qname, tc.id, info)
|
||||
}
|
||||
if !tc.match(err) {
|
||||
t.Errorf("GetTaskInfo(%q, %v) returned unexpected error: %v", tc.qname, tc.id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListPending(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user