2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Refactor redis keys and store messages in protobuf

Changes:
- Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout".
- Redis LIST and ZSET stores task message IDs
- Task messages are serialized using protocol buffer
This commit is contained in:
Ken Hibino 2021-03-12 16:23:08 -08:00
parent fdae82762c
commit d9481bd524
5 changed files with 124 additions and 1 deletions

View File

@ -6,7 +6,6 @@
package asynqtest package asynqtest
import ( import (
"encoding/json"
"math" "math"
"sort" "sort"
"testing" "testing"

View File

@ -6,6 +6,7 @@
package base package base
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"sync" "sync"
@ -174,10 +175,21 @@ type TaskMessage struct {
func EncodeMessage(msg *TaskMessage) ([]byte, error) { func EncodeMessage(msg *TaskMessage) ([]byte, error) {
if msg == nil { if msg == nil {
return nil, fmt.Errorf("cannot encode nil message") return nil, fmt.Errorf("cannot encode nil message")
<<<<<<< HEAD
} }
return proto.Marshal(&pb.TaskMessage{ return proto.Marshal(&pb.TaskMessage{
Type: msg.Type, Type: msg.Type,
Payload: msg.Payload, Payload: msg.Payload,
=======
}
payload, err := json.Marshal(msg.Payload)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Id: msg.ID.String(), Id: msg.ID.String(),
Queue: msg.Queue, Queue: msg.Queue,
Retry: int32(msg.Retry), Retry: int32(msg.Retry),
@ -193,11 +205,22 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
func DecodeMessage(data []byte) (*TaskMessage, error) { func DecodeMessage(data []byte) (*TaskMessage, error) {
var pbmsg pb.TaskMessage var pbmsg pb.TaskMessage
if err := proto.Unmarshal(data, &pbmsg); err != nil { if err := proto.Unmarshal(data, &pbmsg); err != nil {
<<<<<<< HEAD
=======
return nil, err
}
payload, err := decodePayload(pbmsg.GetPayload())
if err != nil {
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
return nil, err return nil, err
} }
return &TaskMessage{ return &TaskMessage{
Type: pbmsg.GetType(), Type: pbmsg.GetType(),
<<<<<<< HEAD
Payload: pbmsg.GetPayload(), Payload: pbmsg.GetPayload(),
=======
Payload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
ID: uuid.MustParse(pbmsg.GetId()), ID: uuid.MustParse(pbmsg.GetId()),
Queue: pbmsg.GetQueue(), Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()), Retry: int(pbmsg.GetRetry()),
@ -350,7 +373,11 @@ type WorkerInfo struct {
ServerID string ServerID string
ID string ID string
Type string Type string
<<<<<<< HEAD
Payload []byte Payload []byte
=======
Payload map[string]interface{}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue string Queue string
Started time.Time Started time.Time
Deadline time.Time Deadline time.Time
@ -361,6 +388,13 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil { if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info") return nil, fmt.Errorf("cannot encode nil worker info")
} }
<<<<<<< HEAD
=======
payload, err := json.Marshal(info.Payload)
if err != nil {
return nil, err
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
startTime, err := ptypes.TimestampProto(info.Started) startTime, err := ptypes.TimestampProto(info.Started)
if err != nil { if err != nil {
return nil, err return nil, err
@ -375,19 +409,43 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
ServerId: info.ServerID, ServerId: info.ServerID,
TaskId: info.ID, TaskId: info.ID,
TaskType: info.Type, TaskType: info.Type,
<<<<<<< HEAD
TaskPayload: info.Payload, TaskPayload: info.Payload,
=======
TaskPayload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue: info.Queue, Queue: info.Queue,
StartTime: startTime, StartTime: startTime,
Deadline: deadline, Deadline: deadline,
}) })
} }
<<<<<<< HEAD
=======
func decodePayload(b []byte) (map[string]interface{}, error) {
d := json.NewDecoder(bytes.NewReader(b))
d.UseNumber()
payload := make(map[string]interface{})
if err := d.Decode(&payload); err != nil {
return nil, err
}
return payload, nil
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
// DecodeWorkerInfo decodes the given bytes into WorkerInfo. // DecodeWorkerInfo decodes the given bytes into WorkerInfo.
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
var pbmsg pb.WorkerInfo var pbmsg pb.WorkerInfo
if err := proto.Unmarshal(b, &pbmsg); err != nil { if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err return nil, err
} }
<<<<<<< HEAD
=======
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil { if err != nil {
return nil, err return nil, err
@ -402,7 +460,11 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
ServerID: pbmsg.GetServerId(), ServerID: pbmsg.GetServerId(),
ID: pbmsg.GetTaskId(), ID: pbmsg.GetTaskId(),
Type: pbmsg.GetTaskType(), Type: pbmsg.GetTaskType(),
<<<<<<< HEAD
Payload: pbmsg.GetTaskPayload(), Payload: pbmsg.GetTaskPayload(),
=======
Payload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue: pbmsg.GetQueue(), Queue: pbmsg.GetQueue(),
Started: startTime, Started: startTime,
Deadline: deadline, Deadline: deadline,
@ -439,6 +501,13 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil { if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry") return nil, fmt.Errorf("cannot encode nil scheduler entry")
} }
<<<<<<< HEAD
=======
payload, err := json.Marshal(entry.Payload)
if err != nil {
return nil, err
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
next, err := ptypes.TimestampProto(entry.Next) next, err := ptypes.TimestampProto(entry.Next)
if err != nil { if err != nil {
return nil, err return nil, err
@ -451,7 +520,11 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
Id: entry.ID, Id: entry.ID,
Spec: entry.Spec, Spec: entry.Spec,
TaskType: entry.Type, TaskType: entry.Type,
<<<<<<< HEAD
TaskPayload: entry.Payload, TaskPayload: entry.Payload,
=======
TaskPayload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
EnqueueOptions: entry.Opts, EnqueueOptions: entry.Opts,
NextEnqueueTime: next, NextEnqueueTime: next,
PrevEnqueueTime: prev, PrevEnqueueTime: prev,
@ -464,6 +537,13 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil { if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err return nil, err
} }
<<<<<<< HEAD
=======
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
if err != nil { if err != nil {
return nil, err return nil, err
@ -476,7 +556,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
ID: pbmsg.GetId(), ID: pbmsg.GetId(),
Spec: pbmsg.GetSpec(), Spec: pbmsg.GetSpec(),
Type: pbmsg.GetTaskType(), Type: pbmsg.GetTaskType(),
<<<<<<< HEAD
Payload: pbmsg.GetTaskPayload(), Payload: pbmsg.GetTaskPayload(),
=======
Payload: payload,
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Opts: pbmsg.GetEnqueueOptions(), Opts: pbmsg.GetEnqueueOptions(),
Next: next, Next: next,
Prev: prev, Prev: prev,

View File

@ -400,7 +400,11 @@ func TestServerInfoEncoding(t *testing.T) {
Concurrency: 10, Concurrency: 10,
Queues: map[string]int{"default": 1, "critical": 2}, Queues: map[string]int{"default": 1, "critical": 2},
StrictPriority: false, StrictPriority: false,
<<<<<<< HEAD
Status: "active", Status: "active",
=======
Status: "running",
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Started: time.Now().Add(-3 * time.Hour), Started: time.Now().Add(-3 * time.Hour),
ActiveWorkerCount: 8, ActiveWorkerCount: 8,
}, },
@ -436,7 +440,11 @@ func TestWorkerInfoEncoding(t *testing.T) {
ServerID: "abc123", ServerID: "abc123",
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "taskA", Type: "taskA",
<<<<<<< HEAD
Payload: toBytes(map[string]interface{}{"foo": "bar"}), Payload: toBytes(map[string]interface{}{"foo": "bar"}),
=======
Payload: map[string]interface{}{"foo": "bar"},
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue: "default", Queue: "default",
Started: time.Now().Add(-3 * time.Hour), Started: time.Now().Add(-3 * time.Hour),
Deadline: time.Now().Add(30 * time.Second), Deadline: time.Now().Add(30 * time.Second),
@ -471,7 +479,11 @@ func TestSchedulerEntryEncoding(t *testing.T) {
ID: uuid.NewString(), ID: uuid.NewString(),
Spec: "* * * * *", Spec: "* * * * *",
Type: "task_A", Type: "task_A",
<<<<<<< HEAD
Payload: toBytes(map[string]interface{}{"foo": "bar"}), Payload: toBytes(map[string]interface{}{"foo": "bar"}),
=======
Payload: map[string]interface{}{"foo": "bar"},
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Opts: []string{"Queue('email')"}, Opts: []string{"Queue('email')"},
Next: time.Now().Add(30 * time.Second).UTC(), Next: time.Now().Add(30 * time.Second).UTC(),
Prev: time.Now().Add(-2 * time.Minute).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(),

View File

@ -425,11 +425,19 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
s, err := cast.ToStringE(data[i]) s, err := cast.ToStringE(data[i])
if err != nil { if err != nil {
return nil, err return nil, err
<<<<<<< HEAD
} }
score, err := cast.ToInt64E(data[i+1]) score, err := cast.ToInt64E(data[i+1])
if err != nil { if err != nil {
return nil, err return nil, err
} }
=======
}
score, err := cast.ToInt64E(data[i+1])
if err != nil {
return nil, err
}
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
msg, err := base.DecodeMessage([]byte(s)) msg, err := base.DecodeMessage([]byte(s))
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue

View File

@ -101,7 +101,11 @@ func TestEnqueueUnique(t *testing.T) {
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
<<<<<<< HEAD
Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}),
=======
Payload: map[string]interface{}{"user_id": json.Number("123")},
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue: base.DefaultQueueName, Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})),
} }
@ -157,7 +161,11 @@ func TestDequeue(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
<<<<<<< HEAD
Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
=======
Payload: map[string]interface{}{"subject": "hello!"},
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
Queue: "default", Queue: "default",
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
@ -767,7 +775,11 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
<<<<<<< HEAD
msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
=======
msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
tests := []struct { tests := []struct {
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
@ -1475,7 +1487,11 @@ func TestWriteServerState(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().UTC(), Started: time.Now().UTC(),
<<<<<<< HEAD
Status: "active", Status: "active",
=======
Status: "running",
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
ActiveWorkerCount: 0, ActiveWorkerCount: 0,
} }
@ -1565,7 +1581,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1}, Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false, StrictPriority: false,
Started: time.Now().Add(-10 * time.Minute).UTC(), Started: time.Now().Add(-10 * time.Minute).UTC(),
<<<<<<< HEAD
Status: "active", Status: "active",
=======
Status: "running",
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
ActiveWorkerCount: len(workers), ActiveWorkerCount: len(workers),
} }