diff --git a/internal/base/base.go b/internal/base/base.go index 2ea0450..0c9f34d 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -14,6 +14,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/golang/protobuf/ptypes" "github.com/google/uuid" + "github.com/hibiken/asynq/internal/errors" pb "github.com/hibiken/asynq/internal/proto" "google.golang.org/protobuf/proto" ) @@ -63,6 +64,22 @@ func (s TaskState) String() string { panic(fmt.Sprintf("internal error: unknown task state %d", s)) } +func TaskStateFromString(s string) (TaskState, error) { + switch s { + case "active": + return TaskStateActive, nil + case "pending": + return TaskStatePending, nil + case "scheduled": + return TaskStateScheduled, nil + case "retry": + return TaskStateRetry, nil + case "archived": + return TaskStateArchived, nil + } + return 0, errors.E(errors.FailedPrecondition, fmt.Sprintf("%q is not supported task state", s)) +} + // ValidateQueueName validates a given qname to be used as a queue name. // Returns nil if valid, otherwise returns non-nil error. func ValidateQueueName(qname string) error { @@ -249,6 +266,13 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { }, nil } +// TaskInfo describes a task message and its metadata. +type TaskInfo struct { + Message *TaskMessage + State TaskState + NextProcessAt time.Time +} + // Z represents sorted set member. type Z struct { Message *TaskMessage diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 7b1a25b..2100b39 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -292,6 +292,103 @@ func reverse(x []string) { } } +// checkQueueExists verifies whether the queue exists. +// It returns QueueNotFoundError if queue doesn't exist. +func (r *RDB) checkQueueExists(qname string) error { + exists, err := r.client.SIsMember(base.AllQueues, qname).Result() + if err != nil { + return errors.E(errors.Unknown, &errors.RedisCommandError{Command: "sismember", Err: err}) + } + if !exists { + return errors.E(errors.Internal, &errors.QueueNotFoundError{Queue: qname}) + } + return nil +} + +// Input: +// KEYS[1] -> task key (asynq:{}:t:) +// ARGV[1] -> task id +// ARGV[2] -> current time in Unix time (seconds) +// ARGV[3] -> queue key prefix (asynq:{}:) +// +// Output: +// Tuple of {msg, state, nextProcessAt} +// msg: encoded task message +// state: string describing the state of the task +// nextProcessAt: unix time in seconds, zero if not applicable. +// +// If the task key doesn't exist, it returns error with a message "NOT FOUND" +var getTaskInfoCmd = redis.NewScript(` + if redis.call("EXISTS", KEYS[1]) == 0 then + return redis.error_reply("NOT FOUND") + end + local msg, state = unpack(redis.call("HMGET", KEYS[1], "msg", "state")) + if state == "scheduled" or state == "retry" then + return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1])} + end + if state == "pending" then + return {msg, state, ARGV[2]} + end + return {msg, state, 0} +`) + +// GetTaskInfo returns a TaskInfo describing the task from the given queue. +func (r *RDB) GetTaskInfo(qname string, id uuid.UUID) (*base.TaskInfo, error) { + var op errors.Op = "rdb.GetTaskInfo" + if err := r.checkQueueExists(qname); err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + keys := []string{base.TaskKey(qname, id.String())} + argv := []interface{}{ + id.String(), + time.Now().Unix(), + base.QueueKeyPrefix(qname), + } + res, err := getTaskInfoCmd.Run(r.client, keys, argv...).Result() + if err != nil { + if err.Error() == "NOT FOUND" { + return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) + } + return nil, errors.E(op, errors.Unknown, err) + } + vals, err := cast.ToSliceE(res) + if err != nil { + return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") + } + if len(vals) != 3 { + return nil, errors.E(op, errors.Internal, "unepxected number of values returned from Lua script") + } + encoded, err := cast.ToStringE(vals[0]) + if err != nil { + return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") + } + stateStr, err := cast.ToStringE(vals[1]) + if err != nil { + return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") + } + processAtUnix, err := cast.ToInt64E(vals[2]) + if err != nil { + return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") + } + msg, err := base.DecodeMessage([]byte(encoded)) + if err != nil { + return nil, errors.E(op, errors.Internal, "could not decode task message") + } + state, err := base.TaskStateFromString(stateStr) + if err != nil { + return nil, errors.E(op, errors.CanonicalCode(err), err) + } + var nextProcessAt time.Time + if processAtUnix != 0 { + nextProcessAt = time.Unix(processAtUnix, 0) + } + return &base.TaskInfo{ + Message: msg, + State: state, + NextProcessAt: nextProcessAt, + }, nil +} + // Pagination specifies the page size and page number // for the list operation. type Pagination struct { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 68380ba..00500ad 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -309,6 +309,199 @@ func TestRedisInfo(t *testing.T) { } } +func TestGetTaskInfo(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessageWithQueue("task1", nil, "default") + m2 := h.NewTaskMessageWithQueue("task2", nil, "default") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m5 := h.NewTaskMessageWithQueue("task5", nil, "custom") + + now := time.Now() + fiveMinsFromNow := now.Add(5 * time.Minute) + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursAgo := now.Add(-2 * time.Hour) + + fixtures := struct { + active map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + }{ + active: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m5}, + }, + scheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: fiveMinsFromNow.Unix()}}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: oneHourFromNow.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: twoHoursAgo.Unix()}}, + }, + } + + h.SeedAllActiveQueues(t, r.client, fixtures.active) + h.SeedAllPendingQueues(t, r.client, fixtures.pending) + h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled) + h.SeedAllRetryQueues(t, r.client, fixtures.retry) + h.SeedAllArchivedQueues(t, r.client, fixtures.archived) + + tests := []struct { + qname string + id uuid.UUID + want *base.TaskInfo + }{ + { + qname: "default", + id: m1.ID, + want: &base.TaskInfo{ + Message: m1, + State: base.TaskStateActive, + NextProcessAt: time.Time{}, // zero value for N/A + }, + }, + { + qname: "default", + id: m2.ID, + want: &base.TaskInfo{ + Message: m2, + State: base.TaskStateScheduled, + NextProcessAt: fiveMinsFromNow, + }, + }, + { + qname: "custom", + id: m3.ID, + want: &base.TaskInfo{ + Message: m3, + State: base.TaskStateRetry, + NextProcessAt: oneHourFromNow, + }, + }, + { + qname: "custom", + id: m4.ID, + want: &base.TaskInfo{ + Message: m4, + State: base.TaskStateArchived, + NextProcessAt: time.Time{}, // zero value for N/A + }, + }, + { + qname: "custom", + id: m5.ID, + want: &base.TaskInfo{ + Message: m5, + State: base.TaskStatePending, + NextProcessAt: now, + }, + }, + } + + for _, tc := range tests { + got, err := r.GetTaskInfo(tc.qname, tc.id) + if err != nil { + t.Errorf("GetTaskInfo(%q, %v) returned error: %v", tc.qname, tc.id, err) + continue + } + if diff := cmp.Diff(tc.want, got, cmpopts.EquateApproxTime(2*time.Second)); diff != "" { + t.Errorf("GetTaskInfo(%q, %v) = %v, want %v; (-want,+got)\n%s", + tc.qname, tc.id, got, tc.want, diff) + } + } +} + +func TestGetTaskInfoError(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessageWithQueue("task1", nil, "default") + m2 := h.NewTaskMessageWithQueue("task2", nil, "default") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + m5 := h.NewTaskMessageWithQueue("task5", nil, "custom") + + now := time.Now() + fiveMinsFromNow := now.Add(5 * time.Minute) + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursAgo := now.Add(-2 * time.Hour) + + fixtures := struct { + active map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + }{ + active: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m5}, + }, + scheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: fiveMinsFromNow.Unix()}}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m3, Score: oneHourFromNow.Unix()}}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: twoHoursAgo.Unix()}}, + }, + } + + h.SeedAllActiveQueues(t, r.client, fixtures.active) + h.SeedAllPendingQueues(t, r.client, fixtures.pending) + h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled) + h.SeedAllRetryQueues(t, r.client, fixtures.retry) + h.SeedAllArchivedQueues(t, r.client, fixtures.archived) + + tests := []struct { + qname string + id uuid.UUID + match func(err error) bool + }{ + { + qname: "nonexistent", + id: m1.ID, + match: errors.IsQueueNotFound, + }, + { + qname: "default", + id: uuid.New(), + match: errors.IsTaskNotFound, + }, + } + + for _, tc := range tests { + info, err := r.GetTaskInfo(tc.qname, tc.id) + if info != nil { + t.Errorf("GetTaskInfo(%q, %v) returned info: %v", tc.qname, tc.id, info) + } + if !tc.match(err) { + t.Errorf("GetTaskInfo(%q, %v) returned unexpected error: %v", tc.qname, tc.id, err) + } + } +} + func TestListPending(t *testing.T) { r := setup(t) defer r.Close()