2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Add nil check for message encoding helpers

This commit is contained in:
Ken Hibino 2021-03-09 07:08:27 -08:00
parent e136e26b7e
commit 3c7327e182

View File

@ -200,6 +200,9 @@ type TaskMessage struct {
// EncodeMessage marshals the given task message and returns an encoded bytes.
func EncodeMessage(msg *TaskMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("cannot encode nil message")
}
payload, err := json.Marshal(msg.Payload)
if err != nil {
return nil, err
@ -229,16 +232,16 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return nil, err
}
return &TaskMessage{
Type: pbmsg.Type,
Type: pbmsg.GetType(),
Payload: payload,
ID: uuid.MustParse(pbmsg.Id),
Queue: pbmsg.Queue,
Retry: int(pbmsg.Retry),
Retried: int(pbmsg.Retried),
ErrorMsg: pbmsg.ErrorMsg,
Timeout: pbmsg.Timeout,
Deadline: pbmsg.Deadline,
UniqueKey: pbmsg.UniqueKey,
ID: uuid.MustParse(pbmsg.GetId()),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),
Retried: int(pbmsg.GetRetried()),
ErrorMsg: pbmsg.GetErrorMsg(),
Timeout: pbmsg.GetTimeout(),
Deadline: pbmsg.GetDeadline(),
UniqueKey: pbmsg.GetUniqueKey(),
}, nil
}
@ -322,11 +325,14 @@ type ServerInfo struct {
// EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.
func EncodeServerInfo(info *ServerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil server info")
}
queues := make(map[string]int32)
for q, p := range info.Queues {
queues[q] = int32(p)
}
ts, err := ptypes.TimestampProto(info.Started)
started, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
}
@ -338,7 +344,7 @@ func EncodeServerInfo(info *ServerInfo) ([]byte, error) {
Queues: queues,
StrictPriority: info.StrictPriority,
Status: info.Status,
StartTime: ts,
StartTime: started,
ActiveWorkerCount: int32(info.ActiveWorkerCount),
})
}
@ -358,15 +364,15 @@ func DecodeServerInfo(b []byte) (*ServerInfo, error) {
return nil, err
}
return &ServerInfo{
Host: pbmsg.Host,
PID: int(pbmsg.Pid),
ServerID: pbmsg.ServerId,
Concurrency: int(pbmsg.Concurrency),
Host: pbmsg.GetHost(),
PID: int(pbmsg.GetPid()),
ServerID: pbmsg.GetServerId(),
Concurrency: int(pbmsg.GetConcurrency()),
Queues: queues,
StrictPriority: pbmsg.StrictPriority,
Status: pbmsg.Status,
StrictPriority: pbmsg.GetStrictPriority(),
Status: pbmsg.GetStatus(),
Started: startTime,
ActiveWorkerCount: int(pbmsg.ActiveWorkerCount),
ActiveWorkerCount: int(pbmsg.GetActiveWorkerCount()),
}, nil
}
@ -386,6 +392,9 @@ type WorkerInfo struct {
// EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.
func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info")
}
payload, err := json.Marshal(info.Payload)
if err != nil {
return nil, err
@ -440,13 +449,13 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
return nil, err
}
return &WorkerInfo{
Host: pbmsg.Host,
PID: int(pbmsg.Pid),
ServerID: pbmsg.ServerId,
ID: pbmsg.TaskId,
Type: pbmsg.TaskType,
Host: pbmsg.GetHost(),
PID: int(pbmsg.GetPid()),
ServerID: pbmsg.GetServerId(),
ID: pbmsg.GetTaskId(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Queue: pbmsg.Queue,
Queue: pbmsg.GetQueue(),
Started: startTime,
Deadline: deadline,
}, nil
@ -479,6 +488,9 @@ type SchedulerEntry struct {
// EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.
func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry")
}
payload, err := json.Marshal(entry.Payload)
if err != nil {
return nil, err
@ -521,11 +533,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
return nil, err
}
return &SchedulerEntry{
ID: pbmsg.Id,
Spec: pbmsg.Spec,
Type: pbmsg.TaskType,
ID: pbmsg.GetId(),
Spec: pbmsg.GetSpec(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Opts: pbmsg.EnqueueOptions,
Opts: pbmsg.GetEnqueueOptions(),
Next: next,
Prev: prev,
}, nil
@ -543,6 +555,9 @@ type SchedulerEnqueueEvent struct {
// EncodeSchedulerEnqueueEvent marshals the given event
// and returns an encoded bytes.
func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) {
if event == nil {
return nil, fmt.Errorf("cannot encode nil enqueue event")
}
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt)
if err != nil {
return nil, err
@ -565,7 +580,7 @@ func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) {
return nil, err
}
return &SchedulerEnqueueEvent{
TaskID: pbmsg.TaskId,
TaskID: pbmsg.GetTaskId(),
EnqueuedAt: enqueuedAt,
}, nil
}