2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-08-19 15:08:55 +08:00

Change payload to byte slice

This commit is contained in:
Ken Hibino
2021-03-20 13:42:13 -07:00
parent 7af3981929
commit 476812475e
21 changed files with 254 additions and 1156 deletions

View File

@@ -6,12 +6,8 @@
package base
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
@@ -125,32 +121,8 @@ func SchedulerHistoryKey(entryID string) string {
}
// UniqueKey returns a redis key with the given type, payload, and queue name.
func UniqueKey(qname, tasktype string, payload map[string]interface{}) string {
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload))
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
func UniqueKey(qname, tasktype string, payload []byte) string {
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload))
}
// TaskMessage is the internal representation of a task with additional metadata fields.
@@ -160,7 +132,7 @@ type TaskMessage struct {
Type string
// Payload holds data needed to process the task.
Payload map[string]interface{}
Payload []byte
// ID is a unique identifier for each task.
ID uuid.UUID
@@ -203,13 +175,9 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("cannot encode nil message")
}
payload, err := json.Marshal(msg.Payload)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: payload,
Payload: msg.Payload,
Id: msg.ID.String(),
Queue: msg.Queue,
Retry: int32(msg.Retry),
@@ -227,13 +195,9 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
if err := proto.Unmarshal(data, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetPayload())
if err != nil {
return nil, err
}
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: payload,
Payload: pbmsg.GetPayload(),
ID: uuid.MustParse(pbmsg.GetId()),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),
@@ -383,7 +347,7 @@ type WorkerInfo struct {
ServerID string
ID string
Type string
Payload map[string]interface{}
Payload []byte
Queue string
Started time.Time
Deadline time.Time
@@ -394,10 +358,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info")
}
payload, err := json.Marshal(info.Payload)
if err != nil {
return nil, err
}
startTime, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
@@ -412,33 +372,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
ServerId: info.ServerID,
TaskId: info.ID,
TaskType: info.Type,
TaskPayload: payload,
TaskPayload: info.Payload,
Queue: info.Queue,
StartTime: startTime,
Deadline: deadline,
})
}
func decodePayload(b []byte) (map[string]interface{}, error) {
d := json.NewDecoder(bytes.NewReader(b))
d.UseNumber()
payload := make(map[string]interface{})
if err := d.Decode(&payload); err != nil {
return nil, err
}
return payload, nil
}
// DecodeWorkerInfo decodes the given bytes into WorkerInfo.
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
var pbmsg pb.WorkerInfo
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
@@ -453,7 +399,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
ServerID: pbmsg.GetServerId(),
ID: pbmsg.GetTaskId(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Payload: pbmsg.GetTaskPayload(),
Queue: pbmsg.GetQueue(),
Started: startTime,
Deadline: deadline,
@@ -472,7 +418,7 @@ type SchedulerEntry struct {
Type string
// Payload is the payload of the periodic task.
Payload map[string]interface{}
Payload []byte
// Opts is the options for the periodic task.
Opts []string
@@ -490,10 +436,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry")
}
payload, err := json.Marshal(entry.Payload)
if err != nil {
return nil, err
}
next, err := ptypes.TimestampProto(entry.Next)
if err != nil {
return nil, err
@@ -506,7 +448,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
Id: entry.ID,
Spec: entry.Spec,
TaskType: entry.Type,
TaskPayload: payload,
TaskPayload: entry.Payload,
EnqueueOptions: entry.Opts,
NextEnqueueTime: next,
PrevEnqueueTime: prev,
@@ -519,10 +461,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
if err != nil {
return nil, err
@@ -535,7 +473,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
ID: pbmsg.GetId(),
Spec: pbmsg.GetSpec(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Payload: pbmsg.GetTaskPayload(),
Opts: pbmsg.GetEnqueueOptions(),
Next: next,
Prev: prev,

View File

@@ -267,52 +267,68 @@ func TestSchedulerHistoryKey(t *testing.T) {
}
}
func toBytes(m map[string]interface{}) []byte {
b, err := json.Marshal(m)
if err != nil {
panic(err)
}
return b
}
func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
qname string
tasktype string
payload map[string]interface{}
payload []byte
want string
}{
{
"with primitive types",
"default",
"email:send",
map[string]interface{}{"a": 123, "b": "hello", "c": true},
"asynq:{default}:unique:email:send:a=123,b=hello,c=true",
toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}),
fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}))),
},
{
"with unsorted keys",
"default",
"email:send",
map[string]interface{}{"b": "hello", "c": true, "a": 123},
"asynq:{default}:unique:email:send:a=123,b=hello,c=true",
toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}),
fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}))),
},
{
"with composite types",
"default",
"email:send",
map[string]interface{}{
toBytes(map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}},
"asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]",
"names": []string{"bob", "mike", "rob"}}),
fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}))),
},
{
"with complex types",
"default",
"email:send",
map[string]interface{}{
toBytes(map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour},
"asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC",
"duration": time.Hour}),
fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}))),
},
{
"with nil payload",
"default",
"reindex",
nil,
"asynq:{default}:unique:reindex:nil",
"asynq:{default}:unique:reindex:",
},
}
@@ -333,7 +349,7 @@ func TestMessageEncoding(t *testing.T) {
{
in: &TaskMessage{
Type: "task1",
Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true},
Payload: toBytes(map[string]interface{}{"a": 1, "b": "hello!", "c": true}),
ID: id,
Queue: "default",
Retry: 10,
@@ -343,7 +359,7 @@ func TestMessageEncoding(t *testing.T) {
},
out: &TaskMessage{
Type: "task1",
Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true},
Payload: toBytes(map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}),
ID: id,
Queue: "default",
Retry: 10,
@@ -420,7 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) {
ServerID: "abc123",
ID: uuid.NewString(),
Type: "taskA",
Payload: map[string]interface{}{"foo": "bar"},
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
Queue: "default",
Started: time.Now().Add(-3 * time.Hour),
Deadline: time.Now().Add(30 * time.Second),
@@ -455,7 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) {
ID: uuid.NewString(),
Spec: "* * * * *",
Type: "task_A",
Payload: map[string]interface{}{"foo": "bar"},
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
Opts: []string{"Queue('email')"},
Next: time.Now().Add(30 * time.Second).UTC(),
Prev: time.Now().Add(-2 * time.Minute).UTC(),