From 3f0bc6d738de33a6bd1036e2182660f2b9b291b4 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 31 Mar 2021 06:44:56 -0700 Subject: [PATCH] Add GetTaskInfo method to RDB --- internal/asynqtest/asynqtest.go | 43 +++++++-- internal/base/base.go | 25 +++++ internal/rdb/inspect.go | 60 ++++++++++++ internal/rdb/inspect_test.go | 157 ++++++++++++++++++++++++++++++++ internal/rdb/rdb.go | 2 +- 5 files changed, 278 insertions(+), 9 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 238e8cc..135e5ea 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "testing" + "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" @@ -305,16 +306,27 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, qname string, msgs [] tb.Fatalf("cannot seed redis LIST with task state %s", state) } for _, msg := range msgs { + if msg.Queue != qname { + tb.Fatalf("msg.Queue and queue name do not match! You are trying to seed queue %q with message %+v", qname, msg) + } encoded := MustMarshal(tb, msg) if err := c.LPush(key, msg.ID.String()).Err(); err != nil { tb.Fatal(err) } key := base.TaskKey(msg.Queue, msg.ID.String()) + var processAt int64 + if state == statePending { + processAt = time.Now().Unix() + } + if state == stateActive { + processAt = 0 + } data := map[string]interface{}{ - "msg": encoded, - "timeout": msg.Timeout, - "deadline": msg.Deadline, - "state": strings.ToUpper(state.String()), + "msg": encoded, + "timeout": msg.Timeout, + "deadline": msg.Deadline, + "state": strings.ToUpper(state.String()), + "process_at": processAt, } if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) @@ -337,17 +349,32 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, qname string, items [ } for _, item := range items { msg := item.Message + if msg.Queue != qname { + tb.Fatalf("msg.Queue and queue name do not match! You are trying to seed queue %q with message %+v", qname, msg) + } encoded := MustMarshal(tb, msg) z := &redis.Z{Member: msg.ID.String(), Score: float64(item.Score)} if err := c.ZAdd(key, z).Err(); err != nil { tb.Fatal(err) } key := base.TaskKey(msg.Queue, msg.ID.String()) + var ( + processAt int64 + lastFailedAt int64 + ) + if state == stateScheduled || state == stateRetry { + processAt = item.Score + } + if state == stateArchived { + lastFailedAt = item.Score + } data := map[string]interface{}{ - "msg": encoded, - "timeout": msg.Timeout, - "deadline": msg.Deadline, - "state": strings.ToUpper(state.String()), + "msg": encoded, + "timeout": msg.Timeout, + "deadline": msg.Deadline, + "state": strings.ToUpper(state.String()), + "process_at": processAt, + "last_failed_at": lastFailedAt, } if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) diff --git a/internal/base/base.go b/internal/base/base.go index e062ad3..ed9dbca 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -237,6 +237,31 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { }, nil } +type TaskInfo struct { + *TaskMessage + + // State of the task. + // Possible values are the following: + // - active + // - pending + // - scheduled + // - retry + // - archived + State string + + // NextProcessAt specifies the next processing time for the task in Unix time, + // the number of seconds elapsed since January 1, 1970 UTC. + // + // Value zero is used when task is in active or archived state. + NextProcessAt int64 + + // LastFailedAt specifieds the last time task failed in Unix time, + // the number of seconds elapsed since January 1, 1970 UTC. + // + // Value zero is used if the task has not failed. + LastFailedAt int64 +} + // Z represents sorted set member. type Z struct { Message *TaskMessage diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index e406005..a82770b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -6,6 +6,7 @@ package rdb import ( "fmt" + "strconv" "strings" "time" @@ -287,6 +288,65 @@ func reverse(x []string) { } } +// Parses val as base10 64-bit integer if val contains a value. +// Uses default value if val is nil. +// +// Assumes val contains either string value or nil. +func parseIntOrDefault(val interface{}, defaultVal int64) (int64, error) { + if val == nil { + return defaultVal, nil + } + return strconv.ParseInt(val.(string), 10, 64) +} + +// GetTaskInfo finds a task with the given id from the given queue. +// Returns TaskInfo of the task if a task is found, otherwise returns ErrTaskNotFound. +func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { + key := base.TaskKey(qname, id) + exists, err := r.client.Exists(key).Result() + if err != nil { + return nil, err + } + if exists == 0 { + return nil, ErrTaskNotFound + } + // The "msg", "state" fields are non-nil; + // whereas the "process_at", "last_failed_at" fields can be nil. + res, err := r.client.HMGet(key, "msg", "state", "process_at", "last_failed_at").Result() + if err != nil { + return nil, err + } + if len(res) != 4 { + return nil, fmt.Errorf("asynq internal error: HMGET command returned %d elements", len(res)) + } + encoded := res[0] + if encoded == nil { + return nil, fmt.Errorf("asynq internal error: HMGET field 'msg' was nil") + } + msg, err := base.DecodeMessage([]byte(encoded.(string))) + if err != nil { + return nil, err + } + state := res[1] + if state == nil { + return nil, fmt.Errorf("asynq internal error: HMGET field 'state' was nil") + } + processAt, err := parseIntOrDefault(res[2], 0) + if err != nil { + return nil, err + } + lastFailedAt, err := parseIntOrDefault(res[3], 0) + if err != nil { + return nil, err + } + return &base.TaskInfo{ + TaskMessage: msg, + State: strings.ToLower(state.(string)), + NextProcessAt: processAt, + LastFailedAt: lastFailedAt, + }, nil +} + // Pagination specifies the page size and page number // for the list operation. type Pagination struct { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 59ade88..ec22b1d 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -5,6 +5,7 @@ package rdb import ( + "errors" "fmt" "testing" "time" @@ -308,6 +309,162 @@ func TestRedisInfo(t *testing.T) { } } +func TestGetTaskInfo(t *testing.T) { + r := setup(t) + defer r.Close() + + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") + + now := time.Now() + oneHourFromNow := now.Add(1 * time.Hour) + oneHourAgo := now.Add(-1 * time.Hour) + + // state of the queues + queueState := struct { + active map[string][]*base.TaskMessage + pending map[string][]*base.TaskMessage + scheduled map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + }{ + + active: map[string][]*base.TaskMessage{ + "default": {m1}, + "custom": {}, + }, + pending: map[string][]*base.TaskMessage{ + "default": {}, + "custom": {m3}, + }, + scheduled: map[string][]base.Z{ + "default": {{Message: m2, Score: oneHourFromNow.Unix()}}, + "custom": {}, + }, + retry: map[string][]base.Z{ + "default": {}, + "custom": {}, + }, + archived: map[string][]base.Z{ + "default": {}, + "custom": {{Message: m4, Score: oneHourAgo.Unix()}}, + }, + } + // seed redis with fixtures. + h.FlushDB(t, r.client) + h.SeedAllActiveQueues(t, r.client, queueState.active) + h.SeedAllPendingQueues(t, r.client, queueState.pending) + h.SeedAllScheduledQueues(t, r.client, queueState.scheduled) + h.SeedAllRetryQueues(t, r.client, queueState.retry) + h.SeedAllArchivedQueues(t, r.client, queueState.archived) + + tests := []struct { + qname string + id uuid.UUID + want *base.TaskInfo + }{ + { + qname: "default", + id: m1.ID, + want: &base.TaskInfo{ + TaskMessage: m1, + State: "active", + NextProcessAt: 0, + LastFailedAt: 0, + }, + }, + { + qname: "default", + id: m2.ID, + want: &base.TaskInfo{ + TaskMessage: m2, + State: "scheduled", + NextProcessAt: oneHourFromNow.Unix(), + LastFailedAt: 0, + }, + }, + { + qname: "custom", + id: m3.ID, + want: &base.TaskInfo{ + TaskMessage: m3, + State: "pending", + NextProcessAt: now.Unix(), + LastFailedAt: 0, + }, + }, + { + qname: "custom", + id: m4.ID, + want: &base.TaskInfo{ + TaskMessage: m4, + State: "archived", + NextProcessAt: 0, + LastFailedAt: oneHourAgo.Unix(), + }, + }, + } + + for _, tc := range tests { + + got, err := r.GetTaskInfo(tc.qname, tc.id.String()) + if err != nil { + t.Errorf("GetTaskInfo(%q, %q) failed: %v", + tc.qname, tc.id.String(), err) + continue + } + + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Errorf("GetTaskInfo(%q, %q) = %v, want %v; (-want,+got)\n%s", + tc.qname, tc.id.String(), got, tc.want, diff) + } + } +} + +func TestGetTaskInfoError(t *testing.T) { + r := setup(t) + defer r.Close() + + m := h.NewTaskMessageWithQueue("test1", nil, "custom") + h.SeedPendingQueue(t, r.client, []*base.TaskMessage{m}, "custom") + + tests := []struct { + desc string + qname string + id uuid.UUID + wantErr error + }{ + { + desc: "searching for task in a wrong queue", + qname: "default", + id: m.ID, + wantErr: ErrTaskNotFound, + }, + { + desc: "searching with non-existent task ID", + qname: "custom", + id: uuid.New(), + wantErr: ErrTaskNotFound, + }, + } + + for _, tc := range tests { + _, err := r.GetTaskInfo(tc.qname, tc.id.String()) + if err == nil { + t.Errorf("%s; GetTaskInfo(%q, %q) returned nil error, want %v", + tc.desc, tc.qname, tc.id.String(), tc.wantErr) + continue + } + + if !errors.Is(err, tc.wantErr) { + t.Errorf("%s; GetTaskInfo(%q, %q) returned %v error, want %v", + tc.desc, tc.qname, tc.id.String(), err, tc.wantErr) + } + } +} + func TestListPending(t *testing.T) { r := setup(t) defer r.Close() diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f054145..5cd92d9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -195,7 +195,7 @@ if redis.call("EXISTS", KEYS[2]) == 0 then else return redis.error_reply("asynq internal error: both timeout and deadline are not set") end - redis.call("HSET", key, "state", "ACTIVE") + redis.call("HSET", key, "state", "ACTIVE", "process_at", 0) redis.call("ZADD", KEYS[4], score, id) return {msg, score} end