From d9481bd5249ce9eab2710f67f0f2a21b6acaaff2 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 12 Mar 2021 16:23:08 -0800 Subject: [PATCH] Refactor redis keys and store messages in protobuf Changes: - Task messages are stored under "asynq:{}:t:" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout". - Redis LIST and ZSET stores task message IDs - Task messages are serialized using protocol buffer --- internal/asynqtest/asynqtest.go | 1 - internal/base/base.go | 84 +++++++++++++++++++++++++++++++++ internal/base/base_test.go | 12 +++++ internal/rdb/inspect.go | 8 ++++ internal/rdb/rdb_test.go | 20 ++++++++ 5 files changed, 124 insertions(+), 1 deletion(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 66831ba..a27a34d 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -6,7 +6,6 @@ package asynqtest import ( - "encoding/json" "math" "sort" "testing" diff --git a/internal/base/base.go b/internal/base/base.go index 00ded7a..918dae3 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,6 +6,7 @@ package base import ( + "bytes" "context" "fmt" "sync" @@ -174,10 +175,21 @@ type TaskMessage struct { func EncodeMessage(msg *TaskMessage) ([]byte, error) { if msg == nil { return nil, fmt.Errorf("cannot encode nil message") +<<<<<<< HEAD } return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, Payload: msg.Payload, +======= + } + payload, err := json.Marshal(msg.Payload) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.TaskMessage{ + Type: msg.Type, + Payload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Id: msg.ID.String(), Queue: msg.Queue, Retry: int32(msg.Retry), @@ -193,11 +205,22 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { func DecodeMessage(data []byte) (*TaskMessage, error) { var pbmsg pb.TaskMessage if err := proto.Unmarshal(data, &pbmsg); err != nil { +<<<<<<< HEAD +======= + return nil, err + } + payload, err := decodePayload(pbmsg.GetPayload()) + if err != nil { +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf return nil, err } return &TaskMessage{ Type: pbmsg.GetType(), +<<<<<<< HEAD Payload: pbmsg.GetPayload(), +======= + Payload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ID: uuid.MustParse(pbmsg.GetId()), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), @@ -350,7 +373,11 @@ type WorkerInfo struct { ServerID string ID string Type string +<<<<<<< HEAD Payload []byte +======= + Payload map[string]interface{} +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue string Started time.Time Deadline time.Time @@ -361,6 +388,13 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { if info == nil { return nil, fmt.Errorf("cannot encode nil worker info") } +<<<<<<< HEAD +======= + payload, err := json.Marshal(info.Payload) + if err != nil { + return nil, err + } +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf startTime, err := ptypes.TimestampProto(info.Started) if err != nil { return nil, err @@ -375,19 +409,43 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { ServerId: info.ServerID, TaskId: info.ID, TaskType: info.Type, +<<<<<<< HEAD TaskPayload: info.Payload, +======= + TaskPayload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: info.Queue, StartTime: startTime, Deadline: deadline, }) } +<<<<<<< HEAD +======= +func decodePayload(b []byte) (map[string]interface{}, error) { + d := json.NewDecoder(bytes.NewReader(b)) + d.UseNumber() + payload := make(map[string]interface{}) + if err := d.Decode(&payload); err != nil { + return nil, err + } + return payload, nil +} + +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { var pbmsg pb.WorkerInfo if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } +<<<<<<< HEAD +======= + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { + return nil, err + } +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) if err != nil { return nil, err @@ -402,7 +460,11 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { ServerID: pbmsg.GetServerId(), ID: pbmsg.GetTaskId(), Type: pbmsg.GetTaskType(), +<<<<<<< HEAD Payload: pbmsg.GetTaskPayload(), +======= + Payload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: pbmsg.GetQueue(), Started: startTime, Deadline: deadline, @@ -439,6 +501,13 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { if entry == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } +<<<<<<< HEAD +======= + payload, err := json.Marshal(entry.Payload) + if err != nil { + return nil, err + } +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf next, err := ptypes.TimestampProto(entry.Next) if err != nil { return nil, err @@ -451,7 +520,11 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { Id: entry.ID, Spec: entry.Spec, TaskType: entry.Type, +<<<<<<< HEAD TaskPayload: entry.Payload, +======= + TaskPayload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf EnqueueOptions: entry.Opts, NextEnqueueTime: next, PrevEnqueueTime: prev, @@ -464,6 +537,13 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } +<<<<<<< HEAD +======= + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { + return nil, err + } +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) if err != nil { return nil, err @@ -476,7 +556,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { ID: pbmsg.GetId(), Spec: pbmsg.GetSpec(), Type: pbmsg.GetTaskType(), +<<<<<<< HEAD Payload: pbmsg.GetTaskPayload(), +======= + Payload: payload, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Opts: pbmsg.GetEnqueueOptions(), Next: next, Prev: prev, diff --git a/internal/base/base_test.go b/internal/base/base_test.go index ebd15ce..004f628 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -400,7 +400,11 @@ func TestServerInfoEncoding(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 1, "critical": 2}, StrictPriority: false, +<<<<<<< HEAD Status: "active", +======= + Status: "running", +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Started: time.Now().Add(-3 * time.Hour), ActiveWorkerCount: 8, }, @@ -436,7 +440,11 @@ func TestWorkerInfoEncoding(t *testing.T) { ServerID: "abc123", ID: uuid.NewString(), Type: "taskA", +<<<<<<< HEAD Payload: toBytes(map[string]interface{}{"foo": "bar"}), +======= + Payload: map[string]interface{}{"foo": "bar"}, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: "default", Started: time.Now().Add(-3 * time.Hour), Deadline: time.Now().Add(30 * time.Second), @@ -471,7 +479,11 @@ func TestSchedulerEntryEncoding(t *testing.T) { ID: uuid.NewString(), Spec: "* * * * *", Type: "task_A", +<<<<<<< HEAD Payload: toBytes(map[string]interface{}{"foo": "bar"}), +======= + Payload: map[string]interface{}{"foo": "bar"}, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Opts: []string{"Queue('email')"}, Next: time.Now().Add(30 * time.Second).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(), diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 8f5bc27..53d800b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -425,11 +425,19 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro s, err := cast.ToStringE(data[i]) if err != nil { return nil, err +<<<<<<< HEAD } score, err := cast.ToInt64E(data[i+1]) if err != nil { return nil, err } +======= + } + score, err := cast.ToInt64E(data[i+1]) + if err != nil { + return nil, err + } +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf msg, err := base.DecodeMessage([]byte(s)) if err != nil { continue // bad data, ignore and continue diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f6a41e6..08ea50c 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -101,7 +101,11 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", +<<<<<<< HEAD Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), +======= + Payload: map[string]interface{}{"user_id": json.Number("123")}, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: base.DefaultQueueName, UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } @@ -157,7 +161,11 @@ func TestDequeue(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", +<<<<<<< HEAD Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), +======= + Payload: map[string]interface{}{"subject": "hello!"}, +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: "default", Timeout: 1800, Deadline: 0, @@ -767,7 +775,11 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() +<<<<<<< HEAD msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) +======= + msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf tests := []struct { msg *base.TaskMessage processAt time.Time @@ -1475,7 +1487,11 @@ func TestWriteServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().UTC(), +<<<<<<< HEAD Status: "active", +======= + Status: "running", +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ActiveWorkerCount: 0, } @@ -1565,7 +1581,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute).UTC(), +<<<<<<< HEAD Status: "active", +======= + Status: "running", +>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ActiveWorkerCount: len(workers), }