2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 22:06:12 +08:00

Use protobuf to encode task message

This commit is contained in:
Ken Hibino
2021-03-01 21:33:36 -08:00
parent 47543b36fd
commit 4046932fde
7 changed files with 60 additions and 23 deletions

View File

@@ -6,6 +6,7 @@
package base
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -16,6 +17,8 @@ import (
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
pb "github.com/hibiken/asynq/internal/proto"
"google.golang.org/protobuf/proto"
)
// Version of asynq library and CLI.
@@ -195,8 +198,25 @@ type TaskMessage struct {
}
// EncodeMessage marshals the given task message in JSON and returns an encoded string.
// TODO: Should return []byte instead of string
func EncodeMessage(msg *TaskMessage) (string, error) {
b, err := json.Marshal(msg)
payload, err := json.Marshal(msg.Payload)
if err != nil {
return "", err
}
pbmsg := pb.TaskMessage{
Type: msg.Type,
Payload: payload,
Id: msg.ID.String(),
Queue: msg.Queue,
Retry: int32(msg.Retry),
Retried: int32(msg.Retried),
ErrorMsg: msg.ErrorMsg,
Timeout: msg.Timeout,
Deadline: msg.Deadline,
UniqueKey: msg.UniqueKey,
}
b, err := proto.Marshal(&pbmsg)
if err != nil {
return "", err
}
@@ -204,14 +224,30 @@ func EncodeMessage(msg *TaskMessage) (string, error) {
}
// DecodeMessage unmarshals the given encoded string and returns a decoded task message.
// TODO: should take []byte instead of string
func DecodeMessage(s string) (*TaskMessage, error) {
d := json.NewDecoder(strings.NewReader(s))
d.UseNumber()
var msg TaskMessage
if err := d.Decode(&msg); err != nil {
var pbmsg pb.TaskMessage
if err := proto.Unmarshal([]byte(s), &pbmsg); err != nil {
return nil, err
}
return &msg, nil
d := json.NewDecoder(bytes.NewReader(pbmsg.Payload))
d.UseNumber()
payload := make(map[string]interface{})
if err := d.Decode(&payload); err != nil {
return nil, err
}
return &TaskMessage{
Type: pbmsg.Type,
Payload: payload,
ID: uuid.MustParse(pbmsg.Id),
Queue: pbmsg.Queue,
Retry: int(pbmsg.Retry),
Retried: int(pbmsg.Retried),
ErrorMsg: pbmsg.ErrorMsg,
Timeout: pbmsg.Timeout,
Deadline: pbmsg.Deadline,
UniqueKey: pbmsg.UniqueKey,
}, nil
}
// Z represents sorted set member.