mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Update base.DecodeMessage to take byte slice
This commit is contained in:
parent
e408550d76
commit
9ac0475d4b
@ -140,7 +140,7 @@ func MustMarshal(tb testing.TB, msg *base.TaskMessage) string {
|
|||||||
// Calling test will fail if unmarshaling errors out.
|
// Calling test will fail if unmarshaling errors out.
|
||||||
func MustUnmarshal(tb testing.TB, data string) *base.TaskMessage {
|
func MustUnmarshal(tb testing.TB, data string) *base.TaskMessage {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
msg, err := base.DecodeMessage(data)
|
msg, err := base.DecodeMessage([]byte(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -218,11 +218,10 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
|
|||||||
return proto.Marshal(&pbmsg)
|
return proto.Marshal(&pbmsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecodeMessage unmarshals the given encoded string and returns a decoded task message.
|
// DecodeMessage unmarshals the given bytes and returns a decoded task message.
|
||||||
// TODO: should take []byte instead of string
|
func DecodeMessage(data []byte) (*TaskMessage, error) {
|
||||||
func DecodeMessage(s string) (*TaskMessage, error) {
|
|
||||||
var pbmsg pb.TaskMessage
|
var pbmsg pb.TaskMessage
|
||||||
if err := proto.Unmarshal([]byte(s), &pbmsg); err != nil {
|
if err := proto.Unmarshal(data, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
d := json.NewDecoder(bytes.NewReader(pbmsg.Payload))
|
d := json.NewDecoder(bytes.NewReader(pbmsg.Payload))
|
||||||
|
@ -343,7 +343,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
|
|||||||
reverse(data)
|
reverse(data)
|
||||||
var msgs []*base.TaskMessage
|
var msgs []*base.TaskMessage
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
m, err := base.DecodeMessage(s)
|
m, err := base.DecodeMessage([]byte(s))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // bad data, ignore and continue
|
continue // bad data, ignore and continue
|
||||||
}
|
}
|
||||||
@ -419,7 +419,7 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
msg, err := base.DecodeMessage(s)
|
msg, err := base.DecodeMessage([]byte(s))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // bad data, ignore and continue
|
continue // bad data, ignore and continue
|
||||||
}
|
}
|
||||||
|
@ -146,7 +146,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, time.Time{}, err
|
return nil, time.Time{}, err
|
||||||
}
|
}
|
||||||
if msg, err = base.DecodeMessage(encoded); err != nil {
|
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
|
||||||
return nil, time.Time{}, err
|
return nil, time.Time{}, err
|
||||||
}
|
}
|
||||||
return msg, time.Unix(d, 0), nil
|
return msg, time.Unix(d, 0), nil
|
||||||
@ -619,7 +619,7 @@ func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*bas
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
msg, err := base.DecodeMessage(s)
|
msg, err := base.DecodeMessage([]byte(s))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user