2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update GetTaskInfo to read the task's result field

This commit is contained in:
Ken Hibino 2021-09-24 13:27:31 -07:00
parent 7c06994959
commit e8f140539d
6 changed files with 58 additions and 14 deletions

View File

@ -88,6 +88,10 @@ type TaskInfo struct {
// CompletedAt is the time the task is processed successfully.
// Zero value (i.e. time.Time{}) indicates no value.
CompletedAt time.Time
// Result holds the result data associated with the task.
// Use ResultWriter to write result data from the Handler.
Result []byte
}
// If t is non-zero, returns time converted from t as unix time in seconds.
@ -99,7 +103,7 @@ func fromUnixTimeOrZero(t int64) time.Time {
return time.Unix(t, 0)
}
func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo {
func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time, result []byte) *TaskInfo {
info := TaskInfo{
ID: msg.ID,
Queue: msg.Queue,
@ -114,6 +118,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time
NextProcessAt: nextProcessAt,
LastFailedAt: fromUnixTimeOrZero(msg.LastFailedAt),
CompletedAt: fromUnixTimeOrZero(msg.CompletedAt),
Result: result,
}
switch state {

View File

@ -352,7 +352,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) {
case err != nil:
return nil, err
}
return newTaskInfo(msg, state, opt.processAt), nil
return newTaskInfo(msg, state, opt.processAt, nil), nil
}
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {

View File

@ -186,7 +186,7 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) {
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
return newTaskInfo(info.Message, info.State, info.NextProcessAt), nil
return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil
}
// ListOption specifies behavior of list operation.

View File

@ -299,6 +299,7 @@ type TaskInfo struct {
Message *TaskMessage
State TaskState
NextProcessAt time.Time
Result []byte
}
// Z represents sorted set member.

View File

@ -364,24 +364,25 @@ func (r *RDB) checkQueueExists(qname string) error {
// ARGV[3] -> queue key prefix (asynq:{<qname>}:)
//
// Output:
// Tuple of {msg, state, nextProcessAt}
// Tuple of {msg, state, nextProcessAt, result}
// msg: encoded task message
// state: string describing the state of the task
// nextProcessAt: unix time in seconds, zero if not applicable.
// result: result data associated with the task
//
// If the task key doesn't exist, it returns error with a message "NOT FOUND"
var getTaskInfoCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
local msg, state = unpack(redis.call("HMGET", KEYS[1], "msg", "state"))
local msg, state, result = unpack(redis.call("HMGET", KEYS[1], "msg", "state", "result"))
if state == "scheduled" or state == "retry" then
return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1])}
return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1]), result}
end
if state == "pending" then
return {msg, state, ARGV[2]}
return {msg, state, ARGV[2], result}
end
return {msg, state, 0}
return {msg, state, 0, result}
`)
// GetTaskInfo returns a TaskInfo describing the task from the given queue.
@ -407,7 +408,7 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
if err != nil {
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
}
if len(vals) != 3 {
if len(vals) != 4 {
return nil, errors.E(op, errors.Internal, "unepxected number of values returned from Lua script")
}
encoded, err := cast.ToStringE(vals[0])
@ -422,6 +423,10 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
if err != nil {
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
}
resultStr, err := cast.ToStringE(vals[3])
if err != nil {
return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script")
}
msg, err := base.DecodeMessage([]byte(encoded))
if err != nil {
return nil, errors.E(op, errors.Internal, "could not decode task message")
@ -434,10 +439,15 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
if processAtUnix != 0 {
nextProcessAt = time.Unix(processAtUnix, 0)
}
var result []byte
if len(resultStr) > 0 {
result = []byte(resultStr)
}
return &base.TaskInfo{
Message: msg,
State: state,
NextProcessAt: nextProcessAt,
Result: result,
}, nil
}

View File

@ -315,16 +315,19 @@ func TestGetTaskInfo(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now()
fiveMinsFromNow := now.Add(5 * time.Minute)
oneHourFromNow := now.Add(1 * time.Hour)
twoHoursAgo := now.Add(-2 * time.Hour)
m1 := h.NewTaskMessageWithQueue("task1", nil, "default")
m2 := h.NewTaskMessageWithQueue("task2", nil, "default")
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
m5 := h.NewTaskMessageWithQueue("task5", nil, "custom")
now := time.Now()
fiveMinsFromNow := now.Add(5 * time.Minute)
oneHourFromNow := now.Add(1 * time.Hour)
twoHoursAgo := now.Add(-2 * time.Hour)
m6 := h.NewTaskMessageWithQueue("task5", nil, "custom")
m6.CompletedAt = twoHoursAgo.Unix()
m6.ResultTTL = int64((24 * time.Hour).Seconds())
fixtures := struct {
active map[string][]*base.TaskMessage
@ -332,6 +335,7 @@ func TestGetTaskInfo(t *testing.T) {
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
}{
active: map[string][]*base.TaskMessage{
"default": {m1},
@ -353,6 +357,10 @@ func TestGetTaskInfo(t *testing.T) {
"default": {},
"custom": {{Message: m4, Score: twoHoursAgo.Unix()}},
},
completed: map[string][]base.Z{
"default": {},
"custom": {{Message: m6, Score: m6.CompletedAt + m6.ResultTTL}},
},
}
h.SeedAllActiveQueues(t, r.client, fixtures.active)
@ -360,6 +368,11 @@ func TestGetTaskInfo(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled)
h.SeedAllRetryQueues(t, r.client, fixtures.retry)
h.SeedAllArchivedQueues(t, r.client, fixtures.archived)
h.SeedAllCompletedQueues(t, r.client, fixtures.completed)
// Write result data for the completed task.
if err := r.client.HSet(context.Background(), base.TaskKey(m6.Queue, m6.ID), "result", "foobar").Err(); err != nil {
t.Fatalf("Failed to write result data under task key: %v", err)
}
tests := []struct {
qname string
@ -373,6 +386,7 @@ func TestGetTaskInfo(t *testing.T) {
Message: m1,
State: base.TaskStateActive,
NextProcessAt: time.Time{}, // zero value for N/A
Result: nil,
},
},
{
@ -382,6 +396,7 @@ func TestGetTaskInfo(t *testing.T) {
Message: m2,
State: base.TaskStateScheduled,
NextProcessAt: fiveMinsFromNow,
Result: nil,
},
},
{
@ -391,6 +406,7 @@ func TestGetTaskInfo(t *testing.T) {
Message: m3,
State: base.TaskStateRetry,
NextProcessAt: oneHourFromNow,
Result: nil,
},
},
{
@ -400,6 +416,7 @@ func TestGetTaskInfo(t *testing.T) {
Message: m4,
State: base.TaskStateArchived,
NextProcessAt: time.Time{}, // zero value for N/A
Result: nil,
},
},
{
@ -409,6 +426,17 @@ func TestGetTaskInfo(t *testing.T) {
Message: m5,
State: base.TaskStatePending,
NextProcessAt: now,
Result: nil,
},
},
{
qname: "custom",
id: m6.ID,
want: &base.TaskInfo{
Message: m6,
State: base.TaskStateCompleted,
NextProcessAt: time.Time{}, // zero value for N/A
Result: []byte("foobar"),
},
},
}