2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Add GetTaskInfo method to RDB

This commit is contained in:
Ken Hibino 2021-03-31 06:44:56 -07:00
parent 56af9f6686
commit 3f0bc6d738
5 changed files with 278 additions and 9 deletions

View File

@ -11,6 +11,7 @@ import (
"sort"
"strings"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
@ -305,16 +306,27 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, qname string, msgs []
tb.Fatalf("cannot seed redis LIST with task state %s", state)
}
for _, msg := range msgs {
if msg.Queue != qname {
tb.Fatalf("msg.Queue and queue name do not match! You are trying to seed queue %q with message %+v", qname, msg)
}
encoded := MustMarshal(tb, msg)
if err := c.LPush(key, msg.ID.String()).Err(); err != nil {
tb.Fatal(err)
}
key := base.TaskKey(msg.Queue, msg.ID.String())
var processAt int64
if state == statePending {
processAt = time.Now().Unix()
}
if state == stateActive {
processAt = 0
}
data := map[string]interface{}{
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
"state": strings.ToUpper(state.String()),
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
"state": strings.ToUpper(state.String()),
"process_at": processAt,
}
if err := c.HSet(key, data).Err(); err != nil {
tb.Fatal(err)
@ -337,17 +349,32 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items [
}
for _, item := range items {
msg := item.Message
if msg.Queue != qname {
tb.Fatalf("msg.Queue and queue name do not match! You are trying to seed queue %q with message %+v", qname, msg)
}
encoded := MustMarshal(tb, msg)
z := &redis.Z{Member: msg.ID.String(), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err)
}
key := base.TaskKey(msg.Queue, msg.ID.String())
var (
processAt int64
lastFailedAt int64
)
if state == stateScheduled || state == stateRetry {
processAt = item.Score
}
if state == stateArchived {
lastFailedAt = item.Score
}
data := map[string]interface{}{
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
"state": strings.ToUpper(state.String()),
"msg": encoded,
"timeout": msg.Timeout,
"deadline": msg.Deadline,
"state": strings.ToUpper(state.String()),
"process_at": processAt,
"last_failed_at": lastFailedAt,
}
if err := c.HSet(key, data).Err(); err != nil {
tb.Fatal(err)

View File

@ -237,6 +237,31 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
}, nil
}
type TaskInfo struct {
*TaskMessage
// State of the task.
// Possible values are the following:
// - active
// - pending
// - scheduled
// - retry
// - archived
State string
// NextProcessAt specifies the next processing time for the task in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
//
// Value zero is used when task is in active or archived state.
NextProcessAt int64
// LastFailedAt specifieds the last time task failed in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
//
// Value zero is used if the task has not failed.
LastFailedAt int64
}
// Z represents sorted set member.
type Z struct {
Message *TaskMessage

View File

@ -6,6 +6,7 @@ package rdb
import (
"fmt"
"strconv"
"strings"
"time"
@ -287,6 +288,65 @@ func reverse(x []string) {
}
}
// Parses val as base10 64-bit integer if val contains a value.
// Uses default value if val is nil.
//
// Assumes val contains either string value or nil.
func parseIntOrDefault(val interface{}, defaultVal int64) (int64, error) {
if val == nil {
return defaultVal, nil
}
return strconv.ParseInt(val.(string), 10, 64)
}
// GetTaskInfo finds a task with the given id from the given queue.
// Returns TaskInfo of the task if a task is found, otherwise returns ErrTaskNotFound.
func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
key := base.TaskKey(qname, id)
exists, err := r.client.Exists(key).Result()
if err != nil {
return nil, err
}
if exists == 0 {
return nil, ErrTaskNotFound
}
// The "msg", "state" fields are non-nil;
// whereas the "process_at", "last_failed_at" fields can be nil.
res, err := r.client.HMGet(key, "msg", "state", "process_at", "last_failed_at").Result()
if err != nil {
return nil, err
}
if len(res) != 4 {
return nil, fmt.Errorf("asynq internal error: HMGET command returned %d elements", len(res))
}
encoded := res[0]
if encoded == nil {
return nil, fmt.Errorf("asynq internal error: HMGET field 'msg' was nil")
}
msg, err := base.DecodeMessage([]byte(encoded.(string)))
if err != nil {
return nil, err
}
state := res[1]
if state == nil {
return nil, fmt.Errorf("asynq internal error: HMGET field 'state' was nil")
}
processAt, err := parseIntOrDefault(res[2], 0)
if err != nil {
return nil, err
}
lastFailedAt, err := parseIntOrDefault(res[3], 0)
if err != nil {
return nil, err
}
return &base.TaskInfo{
TaskMessage: msg,
State: strings.ToLower(state.(string)),
NextProcessAt: processAt,
LastFailedAt: lastFailedAt,
}, nil
}
// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {

View File

@ -5,6 +5,7 @@
package rdb
import (
"errors"
"fmt"
"testing"
"time"
@ -308,6 +309,162 @@ func TestRedisInfo(t *testing.T) {
}
}
func TestGetTaskInfo(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
now := time.Now()
oneHourFromNow := now.Add(1 * time.Hour)
oneHourAgo := now.Add(-1 * time.Hour)
// state of the queues
queueState := struct {
active map[string][]*base.TaskMessage
pending map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
}{
active: map[string][]*base.TaskMessage{
"default": {m1},
"custom": {},
},
pending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m3},
},
scheduled: map[string][]base.Z{
"default": {{Message: m2, Score: oneHourFromNow.Unix()}},
"custom": {},
},
retry: map[string][]base.Z{
"default": {},
"custom": {},
},
archived: map[string][]base.Z{
"default": {},
"custom": {{Message: m4, Score: oneHourAgo.Unix()}},
},
}
// seed redis with fixtures.
h.FlushDB(t, r.client)
h.SeedAllActiveQueues(t, r.client, queueState.active)
h.SeedAllPendingQueues(t, r.client, queueState.pending)
h.SeedAllScheduledQueues(t, r.client, queueState.scheduled)
h.SeedAllRetryQueues(t, r.client, queueState.retry)
h.SeedAllArchivedQueues(t, r.client, queueState.archived)
tests := []struct {
qname string
id uuid.UUID
want *base.TaskInfo
}{
{
qname: "default",
id: m1.ID,
want: &base.TaskInfo{
TaskMessage: m1,
State: "active",
NextProcessAt: 0,
LastFailedAt: 0,
},
},
{
qname: "default",
id: m2.ID,
want: &base.TaskInfo{
TaskMessage: m2,
State: "scheduled",
NextProcessAt: oneHourFromNow.Unix(),
LastFailedAt: 0,
},
},
{
qname: "custom",
id: m3.ID,
want: &base.TaskInfo{
TaskMessage: m3,
State: "pending",
NextProcessAt: now.Unix(),
LastFailedAt: 0,
},
},
{
qname: "custom",
id: m4.ID,
want: &base.TaskInfo{
TaskMessage: m4,
State: "archived",
NextProcessAt: 0,
LastFailedAt: oneHourAgo.Unix(),
},
},
}
for _, tc := range tests {
got, err := r.GetTaskInfo(tc.qname, tc.id.String())
if err != nil {
t.Errorf("GetTaskInfo(%q, %q) failed: %v",
tc.qname, tc.id.String(), err)
continue
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("GetTaskInfo(%q, %q) = %v, want %v; (-want,+got)\n%s",
tc.qname, tc.id.String(), got, tc.want, diff)
}
}
}
func TestGetTaskInfoError(t *testing.T) {
r := setup(t)
defer r.Close()
m := h.NewTaskMessageWithQueue("test1", nil, "custom")
h.SeedPendingQueue(t, r.client, []*base.TaskMessage{m}, "custom")
tests := []struct {
desc string
qname string
id uuid.UUID
wantErr error
}{
{
desc: "searching for task in a wrong queue",
qname: "default",
id: m.ID,
wantErr: ErrTaskNotFound,
},
{
desc: "searching with non-existent task ID",
qname: "custom",
id: uuid.New(),
wantErr: ErrTaskNotFound,
},
}
for _, tc := range tests {
_, err := r.GetTaskInfo(tc.qname, tc.id.String())
if err == nil {
t.Errorf("%s; GetTaskInfo(%q, %q) returned nil error, want %v",
tc.desc, tc.qname, tc.id.String(), tc.wantErr)
continue
}
if !errors.Is(err, tc.wantErr) {
t.Errorf("%s; GetTaskInfo(%q, %q) returned %v error, want %v",
tc.desc, tc.qname, tc.id.String(), err, tc.wantErr)
}
}
}
func TestListPending(t *testing.T) {
r := setup(t)
defer r.Close()

View File

@ -195,7 +195,7 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
else
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end
redis.call("HSET", key, "state", "ACTIVE")
redis.call("HSET", key, "state", "ACTIVE", "process_at", 0)
redis.call("ZADD", KEYS[4], score, id)
return {msg, score}
end