From 978c6081248ac2e78b15e58ab6df73cd901fd8a5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sun, 4 Apr 2021 14:09:14 -0700 Subject: [PATCH] Fix merge conflicts --- internal/base/base.go | 84 -------------------------------------- internal/base/base_test.go | 12 ------ internal/rdb/inspect.go | 22 ---------- internal/rdb/rdb_test.go | 20 --------- 4 files changed, 138 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index ed9dbca..78d4795 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,7 +6,6 @@ package base import ( - "bytes" "context" "fmt" "sync" @@ -180,21 +179,10 @@ type TaskMessage struct { func EncodeMessage(msg *TaskMessage) ([]byte, error) { if msg == nil { return nil, fmt.Errorf("cannot encode nil message") -<<<<<<< HEAD } return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, Payload: msg.Payload, -======= - } - payload, err := json.Marshal(msg.Payload) - if err != nil { - return nil, err - } - return proto.Marshal(&pb.TaskMessage{ - Type: msg.Type, - Payload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Id: msg.ID.String(), Queue: msg.Queue, Retry: int32(msg.Retry), @@ -210,22 +198,11 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { func DecodeMessage(data []byte) (*TaskMessage, error) { var pbmsg pb.TaskMessage if err := proto.Unmarshal(data, &pbmsg); err != nil { -<<<<<<< HEAD -======= - return nil, err - } - payload, err := decodePayload(pbmsg.GetPayload()) - if err != nil { ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf return nil, err } return &TaskMessage{ Type: pbmsg.GetType(), -<<<<<<< HEAD Payload: pbmsg.GetPayload(), -======= - Payload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ID: uuid.MustParse(pbmsg.GetId()), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), @@ -403,11 +380,7 @@ type WorkerInfo struct { ServerID string ID string Type string -<<<<<<< HEAD Payload []byte -======= - Payload map[string]interface{} ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue string Started time.Time Deadline time.Time @@ -418,13 +391,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { if info == nil { return nil, fmt.Errorf("cannot encode nil worker info") } -<<<<<<< HEAD -======= - payload, err := json.Marshal(info.Payload) - if err != nil { - return nil, err - } ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf startTime, err := ptypes.TimestampProto(info.Started) if err != nil { return nil, err @@ -439,43 +405,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { ServerId: info.ServerID, TaskId: info.ID, TaskType: info.Type, -<<<<<<< HEAD TaskPayload: info.Payload, -======= - TaskPayload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: info.Queue, StartTime: startTime, Deadline: deadline, }) } -<<<<<<< HEAD -======= -func decodePayload(b []byte) (map[string]interface{}, error) { - d := json.NewDecoder(bytes.NewReader(b)) - d.UseNumber() - payload := make(map[string]interface{}) - if err := d.Decode(&payload); err != nil { - return nil, err - } - return payload, nil -} - ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { var pbmsg pb.WorkerInfo if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } -<<<<<<< HEAD -======= - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) if err != nil { return nil, err @@ -490,11 +432,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { ServerID: pbmsg.GetServerId(), ID: pbmsg.GetTaskId(), Type: pbmsg.GetTaskType(), -<<<<<<< HEAD Payload: pbmsg.GetTaskPayload(), -======= - Payload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: pbmsg.GetQueue(), Started: startTime, Deadline: deadline, @@ -531,13 +469,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { if entry == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } -<<<<<<< HEAD -======= - payload, err := json.Marshal(entry.Payload) - if err != nil { - return nil, err - } ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf next, err := ptypes.TimestampProto(entry.Next) if err != nil { return nil, err @@ -550,11 +481,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { Id: entry.ID, Spec: entry.Spec, TaskType: entry.Type, -<<<<<<< HEAD TaskPayload: entry.Payload, -======= - TaskPayload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf EnqueueOptions: entry.Opts, NextEnqueueTime: next, PrevEnqueueTime: prev, @@ -567,13 +494,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } -<<<<<<< HEAD -======= - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) if err != nil { return nil, err @@ -586,11 +506,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { ID: pbmsg.GetId(), Spec: pbmsg.GetSpec(), Type: pbmsg.GetTaskType(), -<<<<<<< HEAD Payload: pbmsg.GetTaskPayload(), -======= - Payload: payload, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Opts: pbmsg.GetEnqueueOptions(), Next: next, Prev: prev, diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 004f628..ebd15ce 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -400,11 +400,7 @@ func TestServerInfoEncoding(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 1, "critical": 2}, StrictPriority: false, -<<<<<<< HEAD Status: "active", -======= - Status: "running", ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Started: time.Now().Add(-3 * time.Hour), ActiveWorkerCount: 8, }, @@ -440,11 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) { ServerID: "abc123", ID: uuid.NewString(), Type: "taskA", -<<<<<<< HEAD Payload: toBytes(map[string]interface{}{"foo": "bar"}), -======= - Payload: map[string]interface{}{"foo": "bar"}, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: "default", Started: time.Now().Add(-3 * time.Hour), Deadline: time.Now().Add(30 * time.Second), @@ -479,11 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) { ID: uuid.NewString(), Spec: "* * * * *", Type: "task_A", -<<<<<<< HEAD Payload: toBytes(map[string]interface{}{"foo": "bar"}), -======= - Payload: map[string]interface{}{"foo": "bar"}, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Opts: []string{"Queue('email')"}, Next: time.Now().Add(30 * time.Second).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(), diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 63c39b1..cff9f7e 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -488,27 +488,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]*base.TaskIn if err != nil { return nil, err } -<<<<<<< HEAD - var zs []base.Z - for i := 0; i < len(data); i += 2 { - s, err := cast.ToStringE(data[i]) - if err != nil { - return nil, err -<<<<<<< HEAD - } - score, err := cast.ToInt64E(data[i+1]) - if err != nil { - return nil, err - } -======= - } - score, err := cast.ToInt64E(data[i+1]) - if err != nil { - return nil, err - } ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf - msg, err := base.DecodeMessage([]byte(s)) -======= var tasks []*base.TaskInfo for _, s := range data { vals, err := cast.ToSliceE(s) @@ -516,7 +495,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]*base.TaskIn return nil, err } info, err := makeTaskInfo(vals) ->>>>>>> 4c699a2... Update RDB.ListScheduled, ListRetry, and ListArchived to return list of if err != nil { continue // bad data, ignore and continue } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a935de5..520ef6a 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -101,11 +101,7 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", -<<<<<<< HEAD Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), -======= - Payload: map[string]interface{}{"user_id": json.Number("123")}, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: base.DefaultQueueName, UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } @@ -161,11 +157,7 @@ func TestDequeue(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", -<<<<<<< HEAD Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), -======= - Payload: map[string]interface{}{"subject": "hello!"}, ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf Queue: "default", Timeout: 1800, Deadline: 0, @@ -775,11 +767,7 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() -<<<<<<< HEAD msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) -======= - msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf tests := []struct { msg *base.TaskMessage processAt time.Time @@ -1487,11 +1475,7 @@ func TestWriteServerState(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().UTC(), -<<<<<<< HEAD Status: "active", -======= - Status: "running", ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ActiveWorkerCount: 0, } @@ -1581,11 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Started: time.Now().Add(-10 * time.Minute).UTC(), -<<<<<<< HEAD Status: "active", -======= - Status: "running", ->>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf ActiveWorkerCount: len(workers), }