From 3c7327e182a75eeb81c88c56e688b22cb17c782a Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 9 Mar 2021 07:08:27 -0800 Subject: [PATCH] Add nil check for message encoding helpers --- internal/base/base.go | 73 ++++++++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index e18ac38..a5fcead 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -200,6 +200,9 @@ type TaskMessage struct { // EncodeMessage marshals the given task message and returns an encoded bytes. func EncodeMessage(msg *TaskMessage) ([]byte, error) { + if msg == nil { + return nil, fmt.Errorf("cannot encode nil message") + } payload, err := json.Marshal(msg.Payload) if err != nil { return nil, err @@ -229,16 +232,16 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { return nil, err } return &TaskMessage{ - Type: pbmsg.Type, + Type: pbmsg.GetType(), Payload: payload, - ID: uuid.MustParse(pbmsg.Id), - Queue: pbmsg.Queue, - Retry: int(pbmsg.Retry), - Retried: int(pbmsg.Retried), - ErrorMsg: pbmsg.ErrorMsg, - Timeout: pbmsg.Timeout, - Deadline: pbmsg.Deadline, - UniqueKey: pbmsg.UniqueKey, + ID: uuid.MustParse(pbmsg.GetId()), + Queue: pbmsg.GetQueue(), + Retry: int(pbmsg.GetRetry()), + Retried: int(pbmsg.GetRetried()), + ErrorMsg: pbmsg.GetErrorMsg(), + Timeout: pbmsg.GetTimeout(), + Deadline: pbmsg.GetDeadline(), + UniqueKey: pbmsg.GetUniqueKey(), }, nil } @@ -322,11 +325,14 @@ type ServerInfo struct { // EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes. func EncodeServerInfo(info *ServerInfo) ([]byte, error) { + if info == nil { + return nil, fmt.Errorf("cannot encode nil server info") + } queues := make(map[string]int32) for q, p := range info.Queues { queues[q] = int32(p) } - ts, err := ptypes.TimestampProto(info.Started) + started, err := ptypes.TimestampProto(info.Started) if err != nil { return nil, err } @@ -338,7 +344,7 @@ func EncodeServerInfo(info *ServerInfo) ([]byte, error) { Queues: queues, StrictPriority: info.StrictPriority, Status: info.Status, - StartTime: ts, + StartTime: started, ActiveWorkerCount: int32(info.ActiveWorkerCount), }) } @@ -358,15 +364,15 @@ func DecodeServerInfo(b []byte) (*ServerInfo, error) { return nil, err } return &ServerInfo{ - Host: pbmsg.Host, - PID: int(pbmsg.Pid), - ServerID: pbmsg.ServerId, - Concurrency: int(pbmsg.Concurrency), + Host: pbmsg.GetHost(), + PID: int(pbmsg.GetPid()), + ServerID: pbmsg.GetServerId(), + Concurrency: int(pbmsg.GetConcurrency()), Queues: queues, - StrictPriority: pbmsg.StrictPriority, - Status: pbmsg.Status, + StrictPriority: pbmsg.GetStrictPriority(), + Status: pbmsg.GetStatus(), Started: startTime, - ActiveWorkerCount: int(pbmsg.ActiveWorkerCount), + ActiveWorkerCount: int(pbmsg.GetActiveWorkerCount()), }, nil } @@ -386,6 +392,9 @@ type WorkerInfo struct { // EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes. func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { + if info == nil { + return nil, fmt.Errorf("cannot encode nil worker info") + } payload, err := json.Marshal(info.Payload) if err != nil { return nil, err @@ -440,13 +449,13 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { return nil, err } return &WorkerInfo{ - Host: pbmsg.Host, - PID: int(pbmsg.Pid), - ServerID: pbmsg.ServerId, - ID: pbmsg.TaskId, - Type: pbmsg.TaskType, + Host: pbmsg.GetHost(), + PID: int(pbmsg.GetPid()), + ServerID: pbmsg.GetServerId(), + ID: pbmsg.GetTaskId(), + Type: pbmsg.GetTaskType(), Payload: payload, - Queue: pbmsg.Queue, + Queue: pbmsg.GetQueue(), Started: startTime, Deadline: deadline, }, nil @@ -479,6 +488,9 @@ type SchedulerEntry struct { // EncodeSchedulerEntry marshals the given entry and returns an encoded bytes. func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { + if entry == nil { + return nil, fmt.Errorf("cannot encode nil scheduler entry") + } payload, err := json.Marshal(entry.Payload) if err != nil { return nil, err @@ -521,11 +533,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { return nil, err } return &SchedulerEntry{ - ID: pbmsg.Id, - Spec: pbmsg.Spec, - Type: pbmsg.TaskType, + ID: pbmsg.GetId(), + Spec: pbmsg.GetSpec(), + Type: pbmsg.GetTaskType(), Payload: payload, - Opts: pbmsg.EnqueueOptions, + Opts: pbmsg.GetEnqueueOptions(), Next: next, Prev: prev, }, nil @@ -543,6 +555,9 @@ type SchedulerEnqueueEvent struct { // EncodeSchedulerEnqueueEvent marshals the given event // and returns an encoded bytes. func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) { + if event == nil { + return nil, fmt.Errorf("cannot encode nil enqueue event") + } enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) if err != nil { return nil, err @@ -565,7 +580,7 @@ func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) { return nil, err } return &SchedulerEnqueueEvent{ - TaskID: pbmsg.TaskId, + TaskID: pbmsg.GetTaskId(), EnqueuedAt: enqueuedAt, }, nil }