diff --git a/asynq.go b/asynq.go index 854270a..3f92c81 100644 --- a/asynq.go +++ b/asynq.go @@ -88,6 +88,10 @@ type TaskInfo struct { // CompletedAt is the time the task is processed successfully. // Zero value (i.e. time.Time{}) indicates no value. CompletedAt time.Time + + // Result holds the result data associated with the task. + // Use ResultWriter to write result data from the Handler. + Result []byte } // If t is non-zero, returns time converted from t as unix time in seconds. @@ -99,7 +103,7 @@ func fromUnixTimeOrZero(t int64) time.Time { return time.Unix(t, 0) } -func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo { +func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time, result []byte) *TaskInfo { info := TaskInfo{ ID: msg.ID, Queue: msg.Queue, @@ -114,6 +118,7 @@ func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time NextProcessAt: nextProcessAt, LastFailedAt: fromUnixTimeOrZero(msg.LastFailedAt), CompletedAt: fromUnixTimeOrZero(msg.CompletedAt), + Result: result, } switch state { diff --git a/client.go b/client.go index e40785c..f1e489a 100644 --- a/client.go +++ b/client.go @@ -352,7 +352,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { case err != nil: return nil, err } - return newTaskInfo(msg, state, opt.processAt), nil + return newTaskInfo(msg, state, opt.processAt, nil), nil } func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { diff --git a/inspector.go b/inspector.go index 5e81940..9837573 100644 --- a/inspector.go +++ b/inspector.go @@ -186,7 +186,7 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { case err != nil: return nil, fmt.Errorf("asynq: %v", err) } - return newTaskInfo(info.Message, info.State, info.NextProcessAt), nil + return newTaskInfo(info.Message, info.State, info.NextProcessAt, info.Result), nil } // ListOption specifies behavior of list operation. diff --git a/internal/base/base.go b/internal/base/base.go index 4707737..bd6a827 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -299,6 +299,7 @@ type TaskInfo struct { Message *TaskMessage State TaskState NextProcessAt time.Time + Result []byte } // Z represents sorted set member. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 3beffc0..453b6e6 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -364,24 +364,25 @@ func (r *RDB) checkQueueExists(qname string) error { // ARGV[3] -> queue key prefix (asynq:{}:) // // Output: -// Tuple of {msg, state, nextProcessAt} +// Tuple of {msg, state, nextProcessAt, result} // msg: encoded task message // state: string describing the state of the task // nextProcessAt: unix time in seconds, zero if not applicable. +// result: result data associated with the task // // If the task key doesn't exist, it returns error with a message "NOT FOUND" var getTaskInfoCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[1]) == 0 then return redis.error_reply("NOT FOUND") end - local msg, state = unpack(redis.call("HMGET", KEYS[1], "msg", "state")) + local msg, state, result = unpack(redis.call("HMGET", KEYS[1], "msg", "state", "result")) if state == "scheduled" or state == "retry" then - return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1])} + return {msg, state, redis.call("ZSCORE", ARGV[3] .. state, ARGV[1]), result} end if state == "pending" then - return {msg, state, ARGV[2]} + return {msg, state, ARGV[2], result} end - return {msg, state, 0} + return {msg, state, 0, result} `) // GetTaskInfo returns a TaskInfo describing the task from the given queue. @@ -407,7 +408,7 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { if err != nil { return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") } - if len(vals) != 3 { + if len(vals) != 4 { return nil, errors.E(op, errors.Internal, "unepxected number of values returned from Lua script") } encoded, err := cast.ToStringE(vals[0]) @@ -422,6 +423,10 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { if err != nil { return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") } + resultStr, err := cast.ToStringE(vals[3]) + if err != nil { + return nil, errors.E(op, errors.Internal, "unexpected value returned from Lua script") + } msg, err := base.DecodeMessage([]byte(encoded)) if err != nil { return nil, errors.E(op, errors.Internal, "could not decode task message") @@ -434,10 +439,15 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { if processAtUnix != 0 { nextProcessAt = time.Unix(processAtUnix, 0) } + var result []byte + if len(resultStr) > 0 { + result = []byte(resultStr) + } return &base.TaskInfo{ Message: msg, State: state, NextProcessAt: nextProcessAt, + Result: result, }, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 4281a89..385b74f 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -315,16 +315,19 @@ func TestGetTaskInfo(t *testing.T) { r := setup(t) defer r.Close() + now := time.Now() + fiveMinsFromNow := now.Add(5 * time.Minute) + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursAgo := now.Add(-2 * time.Hour) + m1 := h.NewTaskMessageWithQueue("task1", nil, "default") m2 := h.NewTaskMessageWithQueue("task2", nil, "default") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") m5 := h.NewTaskMessageWithQueue("task5", nil, "custom") - - now := time.Now() - fiveMinsFromNow := now.Add(5 * time.Minute) - oneHourFromNow := now.Add(1 * time.Hour) - twoHoursAgo := now.Add(-2 * time.Hour) + m6 := h.NewTaskMessageWithQueue("task5", nil, "custom") + m6.CompletedAt = twoHoursAgo.Unix() + m6.ResultTTL = int64((24 * time.Hour).Seconds()) fixtures := struct { active map[string][]*base.TaskMessage @@ -332,6 +335,7 @@ func TestGetTaskInfo(t *testing.T) { scheduled map[string][]base.Z retry map[string][]base.Z archived map[string][]base.Z + completed map[string][]base.Z }{ active: map[string][]*base.TaskMessage{ "default": {m1}, @@ -353,6 +357,10 @@ func TestGetTaskInfo(t *testing.T) { "default": {}, "custom": {{Message: m4, Score: twoHoursAgo.Unix()}}, }, + completed: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m6, Score: m6.CompletedAt + m6.ResultTTL}}, + }, } h.SeedAllActiveQueues(t, r.client, fixtures.active) @@ -360,6 +368,11 @@ func TestGetTaskInfo(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, fixtures.scheduled) h.SeedAllRetryQueues(t, r.client, fixtures.retry) h.SeedAllArchivedQueues(t, r.client, fixtures.archived) + h.SeedAllCompletedQueues(t, r.client, fixtures.completed) + // Write result data for the completed task. + if err := r.client.HSet(context.Background(), base.TaskKey(m6.Queue, m6.ID), "result", "foobar").Err(); err != nil { + t.Fatalf("Failed to write result data under task key: %v", err) + } tests := []struct { qname string @@ -373,6 +386,7 @@ func TestGetTaskInfo(t *testing.T) { Message: m1, State: base.TaskStateActive, NextProcessAt: time.Time{}, // zero value for N/A + Result: nil, }, }, { @@ -382,6 +396,7 @@ func TestGetTaskInfo(t *testing.T) { Message: m2, State: base.TaskStateScheduled, NextProcessAt: fiveMinsFromNow, + Result: nil, }, }, { @@ -391,6 +406,7 @@ func TestGetTaskInfo(t *testing.T) { Message: m3, State: base.TaskStateRetry, NextProcessAt: oneHourFromNow, + Result: nil, }, }, { @@ -400,6 +416,7 @@ func TestGetTaskInfo(t *testing.T) { Message: m4, State: base.TaskStateArchived, NextProcessAt: time.Time{}, // zero value for N/A + Result: nil, }, }, { @@ -409,6 +426,17 @@ func TestGetTaskInfo(t *testing.T) { Message: m5, State: base.TaskStatePending, NextProcessAt: now, + Result: nil, + }, + }, + { + qname: "custom", + id: m6.ID, + want: &base.TaskInfo{ + Message: m6, + State: base.TaskStateCompleted, + NextProcessAt: time.Time{}, // zero value for N/A + Result: []byte("foobar"), }, }, }