Serialize structs in protobuf

This commit is contained in:
Ken Hibino
2021-03-11 19:07:58 -08:00
committed by GitHub
parent 4c2c98db78
commit c4de2cfbe9
11 changed files with 1415 additions and 68 deletions

View File

@@ -6,7 +6,6 @@
package asynqtest
import (
"encoding/json"
"math"
"sort"
"testing"
@@ -130,7 +129,7 @@ func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage {
// Calling test will fail if marshaling errors out.
func MustMarshal(tb testing.TB, msg *base.TaskMessage) string {
tb.Helper()
data, err := json.Marshal(msg)
data, err := base.EncodeMessage(msg)
if err != nil {
tb.Fatal(err)
}
@@ -141,12 +140,11 @@ func MustMarshal(tb testing.TB, msg *base.TaskMessage) string {
// Calling test will fail if unmarshaling errors out.
func MustUnmarshal(tb testing.TB, data string) *base.TaskMessage {
tb.Helper()
var msg base.TaskMessage
err := json.Unmarshal([]byte(data), &msg)
msg, err := base.DecodeMessage([]byte(data))
if err != nil {
tb.Fatal(err)
}
return &msg
return msg
}
// MustMarshalSlice marshals a slice of task messages and return a slice of

View File

@@ -6,6 +6,7 @@
package base
import (
"bytes"
"context"
"encoding/json"
"fmt"
@@ -15,7 +16,10 @@ import (
"time"
"github.com/go-redis/redis/v7"
"github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
pb "github.com/hibiken/asynq/internal/proto"
"google.golang.org/protobuf/proto"
)
// Version of asynq library and CLI.
@@ -194,24 +198,51 @@ type TaskMessage struct {
UniqueKey string
}
// EncodeMessage marshals the given task message in JSON and returns an encoded string.
func EncodeMessage(msg *TaskMessage) (string, error) {
b, err := json.Marshal(msg)
if err != nil {
return "", err
// EncodeMessage marshals the given task message and returns an encoded bytes.
func EncodeMessage(msg *TaskMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("cannot encode nil message")
}
return string(b), nil
}
// DecodeMessage unmarshals the given encoded string and returns a decoded task message.
func DecodeMessage(s string) (*TaskMessage, error) {
d := json.NewDecoder(strings.NewReader(s))
d.UseNumber()
var msg TaskMessage
if err := d.Decode(&msg); err != nil {
payload, err := json.Marshal(msg.Payload)
if err != nil {
return nil, err
}
return &msg, nil
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: payload,
Id: msg.ID.String(),
Queue: msg.Queue,
Retry: int32(msg.Retry),
Retried: int32(msg.Retried),
ErrorMsg: msg.ErrorMsg,
Timeout: msg.Timeout,
Deadline: msg.Deadline,
UniqueKey: msg.UniqueKey,
})
}
// DecodeMessage unmarshals the given bytes and returns a decoded task message.
func DecodeMessage(data []byte) (*TaskMessage, error) {
var pbmsg pb.TaskMessage
if err := proto.Unmarshal(data, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetPayload())
if err != nil {
return nil, err
}
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: payload,
ID: uuid.MustParse(pbmsg.GetId()),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),
Retried: int(pbmsg.GetRetried()),
ErrorMsg: pbmsg.GetErrorMsg(),
Timeout: pbmsg.GetTimeout(),
Deadline: pbmsg.GetDeadline(),
UniqueKey: pbmsg.GetUniqueKey(),
}, nil
}
// Z represents sorted set member.
@@ -292,6 +323,59 @@ type ServerInfo struct {
ActiveWorkerCount int
}
// EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes.
func EncodeServerInfo(info *ServerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil server info")
}
queues := make(map[string]int32)
for q, p := range info.Queues {
queues[q] = int32(p)
}
started, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.ServerInfo{
Host: info.Host,
Pid: int32(info.PID),
ServerId: info.ServerID,
Concurrency: int32(info.Concurrency),
Queues: queues,
StrictPriority: info.StrictPriority,
Status: info.Status,
StartTime: started,
ActiveWorkerCount: int32(info.ActiveWorkerCount),
})
}
// DecodeServerInfo decodes the given bytes into ServerInfo.
func DecodeServerInfo(b []byte) (*ServerInfo, error) {
var pbmsg pb.ServerInfo
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
queues := make(map[string]int)
for q, p := range pbmsg.GetQueues() {
queues[q] = int(p)
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
}
return &ServerInfo{
Host: pbmsg.GetHost(),
PID: int(pbmsg.GetPid()),
ServerID: pbmsg.GetServerId(),
Concurrency: int(pbmsg.GetConcurrency()),
Queues: queues,
StrictPriority: pbmsg.GetStrictPriority(),
Status: pbmsg.GetStatus(),
Started: startTime,
ActiveWorkerCount: int(pbmsg.GetActiveWorkerCount()),
}, nil
}
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
Host string
@@ -299,12 +383,83 @@ type WorkerInfo struct {
ServerID string
ID string
Type string
Queue string
Payload map[string]interface{}
Queue string
Started time.Time
Deadline time.Time
}
// EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes.
func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info")
}
payload, err := json.Marshal(info.Payload)
if err != nil {
return nil, err
}
startTime, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
}
deadline, err := ptypes.TimestampProto(info.Deadline)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.WorkerInfo{
Host: info.Host,
Pid: int32(info.PID),
ServerId: info.ServerID,
TaskId: info.ID,
TaskType: info.Type,
TaskPayload: payload,
Queue: info.Queue,
StartTime: startTime,
Deadline: deadline,
})
}
func decodePayload(b []byte) (map[string]interface{}, error) {
d := json.NewDecoder(bytes.NewReader(b))
d.UseNumber()
payload := make(map[string]interface{})
if err := d.Decode(&payload); err != nil {
return nil, err
}
return payload, nil
}
// DecodeWorkerInfo decodes the given bytes into WorkerInfo.
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
var pbmsg pb.WorkerInfo
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
}
deadline, err := ptypes.Timestamp(pbmsg.GetDeadline())
if err != nil {
return nil, err
}
return &WorkerInfo{
Host: pbmsg.GetHost(),
PID: int(pbmsg.GetPid()),
ServerID: pbmsg.GetServerId(),
ID: pbmsg.GetTaskId(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Queue: pbmsg.GetQueue(),
Started: startTime,
Deadline: deadline,
}, nil
}
// SchedulerEntry holds information about a periodic task registered with a scheduler.
type SchedulerEntry struct {
// Identifier of this entry.
@@ -330,6 +485,63 @@ type SchedulerEntry struct {
Prev time.Time
}
// EncodeSchedulerEntry marshals the given entry and returns an encoded bytes.
func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry")
}
payload, err := json.Marshal(entry.Payload)
if err != nil {
return nil, err
}
next, err := ptypes.TimestampProto(entry.Next)
if err != nil {
return nil, err
}
prev, err := ptypes.TimestampProto(entry.Prev)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.SchedulerEntry{
Id: entry.ID,
Spec: entry.Spec,
TaskType: entry.Type,
TaskPayload: payload,
EnqueueOptions: entry.Opts,
NextEnqueueTime: next,
PrevEnqueueTime: prev,
})
}
// DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry.
func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
var pbmsg pb.SchedulerEntry
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
if err != nil {
return nil, err
}
prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime())
if err != nil {
return nil, err
}
return &SchedulerEntry{
ID: pbmsg.GetId(),
Spec: pbmsg.GetSpec(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Opts: pbmsg.GetEnqueueOptions(),
Next: next,
Prev: prev,
}, nil
}
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
// ID of the task that was enqueued.
@@ -339,6 +551,39 @@ type SchedulerEnqueueEvent struct {
EnqueuedAt time.Time
}
// EncodeSchedulerEnqueueEvent marshals the given event
// and returns an encoded bytes.
func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) {
if event == nil {
return nil, fmt.Errorf("cannot encode nil enqueue event")
}
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.SchedulerEnqueueEvent{
TaskId: event.TaskID,
EnqueueTime: enqueuedAt,
})
}
// DecodeSchedulerEnqueueEvent unmarshals the given bytes
// and returns a decoded SchedulerEnqueueEvent.
func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) {
var pbmsg pb.SchedulerEnqueueEvent
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime())
if err != nil {
return nil, err
}
return &SchedulerEnqueueEvent{
TaskID: pbmsg.GetTaskId(),
EnqueuedAt: enqueuedAt,
}, nil
}
// Cancelations is a collection that holds cancel functions for all active tasks.
//
// Cancelations are safe for concurrent use by multipel goroutines.

View File

@@ -372,6 +372,145 @@ func TestMessageEncoding(t *testing.T) {
}
}
func TestServerInfoEncoding(t *testing.T) {
tests := []struct {
info ServerInfo
}{
{
info: ServerInfo{
Host: "127.0.0.1",
PID: 9876,
ServerID: "abc123",
Concurrency: 10,
Queues: map[string]int{"default": 1, "critical": 2},
StrictPriority: false,
Status: "running",
Started: time.Now().Add(-3 * time.Hour),
ActiveWorkerCount: 8,
},
},
}
for _, tc := range tests {
encoded, err := EncodeServerInfo(&tc.info)
if err != nil {
t.Errorf("EncodeServerInfo(info) returned error: %v", err)
continue
}
decoded, err := DecodeServerInfo(encoded)
if err != nil {
t.Errorf("DecodeServerInfo(encoded) returned error: %v", err)
continue
}
if diff := cmp.Diff(&tc.info, decoded); diff != "" {
t.Errorf("Decoded ServerInfo == %+v, want %+v;(-want,+got)\n%s",
decoded, tc.info, diff)
}
}
}
func TestWorkerInfoEncoding(t *testing.T) {
tests := []struct {
info WorkerInfo
}{
{
info: WorkerInfo{
Host: "127.0.0.1",
PID: 9876,
ServerID: "abc123",
ID: uuid.NewString(),
Type: "taskA",
Payload: map[string]interface{}{"foo": "bar"},
Queue: "default",
Started: time.Now().Add(-3 * time.Hour),
Deadline: time.Now().Add(30 * time.Second),
},
},
}
for _, tc := range tests {
encoded, err := EncodeWorkerInfo(&tc.info)
if err != nil {
t.Errorf("EncodeWorkerInfo(info) returned error: %v", err)
continue
}
decoded, err := DecodeWorkerInfo(encoded)
if err != nil {
t.Errorf("DecodeWorkerInfo(encoded) returned error: %v", err)
continue
}
if diff := cmp.Diff(&tc.info, decoded); diff != "" {
t.Errorf("Decoded WorkerInfo == %+v, want %+v;(-want,+got)\n%s",
decoded, tc.info, diff)
}
}
}
func TestSchedulerEntryEncoding(t *testing.T) {
tests := []struct {
entry SchedulerEntry
}{
{
entry: SchedulerEntry{
ID: uuid.NewString(),
Spec: "* * * * *",
Type: "task_A",
Payload: map[string]interface{}{"foo": "bar"},
Opts: []string{"Queue('email')"},
Next: time.Now().Add(30 * time.Second).UTC(),
Prev: time.Now().Add(-2 * time.Minute).UTC(),
},
},
}
for _, tc := range tests {
encoded, err := EncodeSchedulerEntry(&tc.entry)
if err != nil {
t.Errorf("EncodeSchedulerEntry(entry) returned error: %v", err)
continue
}
decoded, err := DecodeSchedulerEntry(encoded)
if err != nil {
t.Errorf("DecodeSchedulerEntry(encoded) returned error: %v", err)
continue
}
if diff := cmp.Diff(&tc.entry, decoded); diff != "" {
t.Errorf("Decoded SchedulerEntry == %+v, want %+v;(-want,+got)\n%s",
decoded, tc.entry, diff)
}
}
}
func TestSchedulerEnqueueEventEncoding(t *testing.T) {
tests := []struct {
event SchedulerEnqueueEvent
}{
{
event: SchedulerEnqueueEvent{
TaskID: uuid.NewString(),
EnqueuedAt: time.Now().Add(-30 * time.Second).UTC(),
},
},
}
for _, tc := range tests {
encoded, err := EncodeSchedulerEnqueueEvent(&tc.event)
if err != nil {
t.Errorf("EncodeSchedulerEnqueueEvent(event) returned error: %v", err)
continue
}
decoded, err := DecodeSchedulerEnqueueEvent(encoded)
if err != nil {
t.Errorf("DecodeSchedulerEnqueueEvent(encoded) returned error: %v", err)
continue
}
if diff := cmp.Diff(&tc.event, decoded); diff != "" {
t.Errorf("Decoded SchedulerEnqueueEvent == %+v, want %+v;(-want,+got)\n%s",
decoded, tc.event, diff)
}
}
}
// Test for status being accessed by multiple goroutines.
// Run with -race flag to check for data race.
func TestStatusConcurrentAccess(t *testing.T) {

755
internal/proto/asynq.pb.go Normal file
View File

@@ -0,0 +1,755 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.25.0
// protoc v3.14.0
// source: asynq.proto
package proto
import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
type TaskMessage struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"`
Queue string `protobuf:"bytes,4,opt,name=queue,proto3" json:"queue,omitempty"`
Retry int32 `protobuf:"varint,5,opt,name=retry,proto3" json:"retry,omitempty"`
Retried int32 `protobuf:"varint,6,opt,name=retried,proto3" json:"retried,omitempty"`
ErrorMsg string `protobuf:"bytes,7,opt,name=error_msg,json=errorMsg,proto3" json:"error_msg,omitempty"`
Timeout int64 `protobuf:"varint,8,opt,name=timeout,proto3" json:"timeout,omitempty"`
Deadline int64 `protobuf:"varint,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
UniqueKey string `protobuf:"bytes,10,opt,name=unique_key,json=uniqueKey,proto3" json:"unique_key,omitempty"`
}
func (x *TaskMessage) Reset() {
*x = TaskMessage{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskMessage) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TaskMessage) ProtoMessage() {}
func (x *TaskMessage) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TaskMessage.ProtoReflect.Descriptor instead.
func (*TaskMessage) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{0}
}
func (x *TaskMessage) GetType() string {
if x != nil {
return x.Type
}
return ""
}
func (x *TaskMessage) GetPayload() []byte {
if x != nil {
return x.Payload
}
return nil
}
func (x *TaskMessage) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *TaskMessage) GetQueue() string {
if x != nil {
return x.Queue
}
return ""
}
func (x *TaskMessage) GetRetry() int32 {
if x != nil {
return x.Retry
}
return 0
}
func (x *TaskMessage) GetRetried() int32 {
if x != nil {
return x.Retried
}
return 0
}
func (x *TaskMessage) GetErrorMsg() string {
if x != nil {
return x.ErrorMsg
}
return ""
}
func (x *TaskMessage) GetTimeout() int64 {
if x != nil {
return x.Timeout
}
return 0
}
func (x *TaskMessage) GetDeadline() int64 {
if x != nil {
return x.Deadline
}
return 0
}
func (x *TaskMessage) GetUniqueKey() string {
if x != nil {
return x.UniqueKey
}
return ""
}
// ServerInfo holds information about a running server.
type ServerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"`
ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"`
Concurrency int32 `protobuf:"varint,4,opt,name=concurrency,proto3" json:"concurrency,omitempty"`
Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"`
StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"`
Status string `protobuf:"bytes,7,opt,name=status,proto3" json:"status,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"`
}
func (x *ServerInfo) Reset() {
*x = ServerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *ServerInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*ServerInfo) ProtoMessage() {}
func (x *ServerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use ServerInfo.ProtoReflect.Descriptor instead.
func (*ServerInfo) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{1}
}
func (x *ServerInfo) GetHost() string {
if x != nil {
return x.Host
}
return ""
}
func (x *ServerInfo) GetPid() int32 {
if x != nil {
return x.Pid
}
return 0
}
func (x *ServerInfo) GetServerId() string {
if x != nil {
return x.ServerId
}
return ""
}
func (x *ServerInfo) GetConcurrency() int32 {
if x != nil {
return x.Concurrency
}
return 0
}
func (x *ServerInfo) GetQueues() map[string]int32 {
if x != nil {
return x.Queues
}
return nil
}
func (x *ServerInfo) GetStrictPriority() bool {
if x != nil {
return x.StrictPriority
}
return false
}
func (x *ServerInfo) GetStatus() string {
if x != nil {
return x.Status
}
return ""
}
func (x *ServerInfo) GetStartTime() *timestamppb.Timestamp {
if x != nil {
return x.StartTime
}
return nil
}
func (x *ServerInfo) GetActiveWorkerCount() int32 {
if x != nil {
return x.ActiveWorkerCount
}
return 0
}
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"`
Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"`
ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"`
TaskId string `protobuf:"bytes,4,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
TaskType string `protobuf:"bytes,5,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"`
TaskPayload []byte `protobuf:"bytes,6,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"`
Queue string `protobuf:"bytes,7,opt,name=queue,proto3" json:"queue,omitempty"`
StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"`
}
func (x *WorkerInfo) Reset() {
*x = WorkerInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *WorkerInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*WorkerInfo) ProtoMessage() {}
func (x *WorkerInfo) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead.
func (*WorkerInfo) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{2}
}
func (x *WorkerInfo) GetHost() string {
if x != nil {
return x.Host
}
return ""
}
func (x *WorkerInfo) GetPid() int32 {
if x != nil {
return x.Pid
}
return 0
}
func (x *WorkerInfo) GetServerId() string {
if x != nil {
return x.ServerId
}
return ""
}
func (x *WorkerInfo) GetTaskId() string {
if x != nil {
return x.TaskId
}
return ""
}
func (x *WorkerInfo) GetTaskType() string {
if x != nil {
return x.TaskType
}
return ""
}
func (x *WorkerInfo) GetTaskPayload() []byte {
if x != nil {
return x.TaskPayload
}
return nil
}
func (x *WorkerInfo) GetQueue() string {
if x != nil {
return x.Queue
}
return ""
}
func (x *WorkerInfo) GetStartTime() *timestamppb.Timestamp {
if x != nil {
return x.StartTime
}
return nil
}
func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp {
if x != nil {
return x.Deadline
}
return nil
}
// SchedulerEntry holds information about a periodic task registered with a scheduler.
type SchedulerEntry struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Identifier of the scheduler entry.
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// Periodic schedule spec of the entry.
Spec string `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"`
// Task type of the periodic task.
TaskType string `protobuf:"bytes,3,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"`
// Task payload of the periodic task.
TaskPayload []byte `protobuf:"bytes,4,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"`
// Options used to enqueue the periodic task.
EnqueueOptions []string `protobuf:"bytes,5,rep,name=enqueue_options,json=enqueueOptions,proto3" json:"enqueue_options,omitempty"`
// Next time the task will be enqueued.
NextEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=next_enqueue_time,json=nextEnqueueTime,proto3" json:"next_enqueue_time,omitempty"`
// Last time the task was enqueued.
// Zero time if task was never enqueued.
PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"`
}
func (x *SchedulerEntry) Reset() {
*x = SchedulerEntry{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SchedulerEntry) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SchedulerEntry) ProtoMessage() {}
func (x *SchedulerEntry) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SchedulerEntry.ProtoReflect.Descriptor instead.
func (*SchedulerEntry) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{3}
}
func (x *SchedulerEntry) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *SchedulerEntry) GetSpec() string {
if x != nil {
return x.Spec
}
return ""
}
func (x *SchedulerEntry) GetTaskType() string {
if x != nil {
return x.TaskType
}
return ""
}
func (x *SchedulerEntry) GetTaskPayload() []byte {
if x != nil {
return x.TaskPayload
}
return nil
}
func (x *SchedulerEntry) GetEnqueueOptions() []string {
if x != nil {
return x.EnqueueOptions
}
return nil
}
func (x *SchedulerEntry) GetNextEnqueueTime() *timestamppb.Timestamp {
if x != nil {
return x.NextEnqueueTime
}
return nil
}
func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp {
if x != nil {
return x.PrevEnqueueTime
}
return nil
}
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// ID of the task that was enqueued.
TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
// Time the task was enqueued.
EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"`
}
func (x *SchedulerEnqueueEvent) Reset() {
*x = SchedulerEnqueueEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_asynq_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SchedulerEnqueueEvent) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SchedulerEnqueueEvent) ProtoMessage() {}
func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message {
mi := &file_asynq_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SchedulerEnqueueEvent.ProtoReflect.Descriptor instead.
func (*SchedulerEnqueueEvent) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{4}
}
func (x *SchedulerEnqueueEvent) GetTaskId() string {
if x != nil {
return x.TaskId
}
return ""
}
func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp {
if x != nil {
return x.EnqueueTime
}
return nil
}
var File_asynq_proto protoreflect.FileDescriptor
var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x74,
0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61,
0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x02, 0x0a, 0x0b, 0x54, 0x61, 0x73,
0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70,
0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01,
0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18,
0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74,
0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20,
0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09,
0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52,
0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d,
0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65,
0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18,
0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x12,
0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x22, 0x92,
0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a,
0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73,
0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03,
0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64,
0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18,
0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e,
0x63, 0x79, 0x12, 0x38, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03,
0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x2e, 0x53, 0x65,
0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45,
0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f,
0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18,
0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69,
0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18,
0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a,
0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73,
0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69,
0x76, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18,
0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72,
0x6b, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75,
0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
0x02, 0x38, 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e,
0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20,
0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72,
0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64,
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b,
0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74,
0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14,
0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71,
0x75, 0x65, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12,
0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64,
0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65,
0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70,
0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b,
0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74,
0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27,
0x0a, 0x0f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65,
0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f,
0x6e, 0x65, 0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12,
0x46, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f,
0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75,
0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64,
0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71,
0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71,
0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68,
0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_asynq_proto_rawDescOnce sync.Once
file_asynq_proto_rawDescData = file_asynq_proto_rawDesc
)
func file_asynq_proto_rawDescGZIP() []byte {
file_asynq_proto_rawDescOnce.Do(func() {
file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData)
})
return file_asynq_proto_rawDescData
}
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_asynq_proto_goTypes = []interface{}{
(*TaskMessage)(nil), // 0: tutorial.TaskMessage
(*ServerInfo)(nil), // 1: tutorial.ServerInfo
(*WorkerInfo)(nil), // 2: tutorial.WorkerInfo
(*SchedulerEntry)(nil), // 3: tutorial.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: tutorial.SchedulerEnqueueEvent
nil, // 5: tutorial.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp
}
var file_asynq_proto_depIdxs = []int32{
5, // 0: tutorial.ServerInfo.queues:type_name -> tutorial.ServerInfo.QueuesEntry
6, // 1: tutorial.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 2: tutorial.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: tutorial.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 4: tutorial.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 5: tutorial.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: tutorial.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type
7, // [7:7] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension type_name
7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
}
func init() { file_asynq_proto_init() }
func file_asynq_proto_init() {
if File_asynq_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_asynq_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TaskMessage); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ServerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*WorkerInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SchedulerEntry); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_asynq_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SchedulerEnqueueEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_asynq_proto_rawDesc,
NumEnums: 0,
NumMessages: 6,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_asynq_proto_goTypes,
DependencyIndexes: file_asynq_proto_depIdxs,
MessageInfos: file_asynq_proto_msgTypes,
}.Build()
File_asynq_proto = out.File
file_asynq_proto_rawDesc = nil
file_asynq_proto_goTypes = nil
file_asynq_proto_depIdxs = nil
}

148
internal/proto/asynq.proto Normal file
View File

@@ -0,0 +1,148 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
syntax = "proto3";
package asynq;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/hibiken/asynq/internal/proto";
// TaskMessage is the internal representation of a task with additional
// metadata fields.
message TaskMessage {
// Type indicates the kind of the task to be performed.
string type = 1;
// Payload holds data needed to process the task.
bytes payload = 2;
// Unique identifier for the task.
string id = 3;
// Name of the queue to which this task belongs.
string queue = 4;
// Max number of retries for this task.
int32 retry = 5;
// Number of times this task has been retried so far.
int32 retried = 6;
// Error message from the last failure.
string error_msg = 7;
// Timeout specifies timeout in seconds.
// Use zero to indicate no timeout.
int64 timeout = 8;
// Deadline specifies the deadline for the task in Unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// Use zero to indicate no deadline.
int64 deadline = 9;
// UniqueKey holds the redis key used for uniqueness lock for this task.
// Empty string indicates that no uniqueness lock was used.
string unique_key = 10;
};
// ServerInfo holds information about a running server.
message ServerInfo {
// Host machine the server is running on.
string host = 1;
// PID of the server process.
int32 pid = 2;
// Unique identifier for this server.
string server_id = 3;
// Maximum number of concurrency this server will use.
int32 concurrency = 4;
// List of queue names with their priorities.
// The server will consume tasks from the queues and prioritize
// queues with higher priority numbers.
map<string, int32> queues = 5;
// If set, the server will always consume tasks from a queue with higher
// priority.
bool strict_priority = 6;
// Status indicates the status of the server.
string status = 7;
// Time this server was started.
google.protobuf.Timestamp start_time = 8;
// Number of workers currently processing tasks.
int32 active_worker_count = 9;
};
// WorkerInfo holds information about a running worker.
message WorkerInfo {
// Host matchine this worker is running on.
string host = 1;
// PID of the process in which this worker is running.
int32 pid = 2;
// ID of the server in which this worker is running.
string server_id = 3;
// ID of the task this worker is processing.
string task_id = 4;
// Type of the task this worker is processing.
string task_type = 5;
// Payload of the task this worker is processing.
bytes task_payload = 6;
// Name of the queue the task the worker is processing belongs.
string queue = 7;
// Time this worker started processing the task.
google.protobuf.Timestamp start_time = 8;
// Deadline by which the worker needs to complete processing
// the task. If worker exceeds the deadline, the task will fail.
google.protobuf.Timestamp deadline = 9;
};
// SchedulerEntry holds information about a periodic task registered
// with a scheduler.
message SchedulerEntry {
// Identifier of the scheduler entry.
string id = 1;
// Periodic schedule spec of the entry.
string spec = 2;
// Task type of the periodic task.
string task_type = 3;
// Task payload of the periodic task.
bytes task_payload = 4;
// Options used to enqueue the periodic task.
repeated string enqueue_options = 5;
// Next time the task will be enqueued.
google.protobuf.Timestamp next_enqueue_time = 6;
// Last time the task was enqueued.
// Zero time if task was never enqueued.
google.protobuf.Timestamp prev_enqueue_time = 7;
};
// SchedulerEnqueueEvent holds information about an enqueue event
// by a scheduler.
message SchedulerEnqueueEvent {
// ID of the task that was enqueued.
string task_id = 1;
// Time the task was enqueued.
google.protobuf.Timestamp enqueue_time = 2;
};

View File

@@ -5,7 +5,6 @@
package rdb
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -343,7 +342,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa
reverse(data)
var msgs []*base.TaskMessage
for _, s := range data {
m, err := base.DecodeMessage(s)
m, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
@@ -419,7 +418,7 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
if err != nil {
return nil, err
}
msg, err := base.DecodeMessage(s)
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
@@ -1013,46 +1012,47 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
if err != nil {
continue // skip bad data
}
var info base.ServerInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
info, err := base.DecodeServerInfo([]byte(data))
if err != nil {
continue // skip bad data
}
servers = append(servers, &info)
servers = append(servers, info)
}
return servers, nil
}
// Note: Script also removes stale keys.
var listWorkerKeysCmd = redis.NewScript(`
var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)
local res = {}
for _, key in ipairs(keys) do
local vals = redis.call("HVALS", key)
for _, v in ipairs(vals) do
table.insert(res, v)
end
end
return res`)
// ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
now := time.Now()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil {
return nil, err
}
keys, err := cast.ToStringSliceE(res)
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var workers []*base.WorkerInfo
for _, key := range keys {
data, err := r.client.HVals(key).Result()
for _, s := range data {
w, err := base.DecodeWorkerInfo([]byte(s))
if err != nil {
continue // skip bad data
}
for _, s := range data {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(s), &w); err != nil {
continue // skip bad data
}
workers = append(workers, &w)
}
workers = append(workers, w)
}
return workers, nil
}
@@ -1082,11 +1082,11 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
continue // skip bad data
}
for _, s := range data {
var e base.SchedulerEntry
if err := json.Unmarshal([]byte(s), &e); err != nil {
e, err := base.DecodeSchedulerEntry([]byte(s))
if err != nil {
continue // skip bad data
}
entries = append(entries, &e)
entries = append(entries, e)
}
}
return entries, nil
@@ -1105,11 +1105,11 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas
if err != nil {
return nil, err
}
var e base.SchedulerEnqueueEvent
if err := json.Unmarshal([]byte(data), &e); err != nil {
e, err := base.DecodeSchedulerEnqueueEvent([]byte(data))
if err != nil {
return nil, err
}
events = append(events, &e)
events = append(events, e)
}
return events, nil
}

View File

@@ -6,7 +6,6 @@
package rdb
import (
"encoding/json"
"errors"
"fmt"
"time"
@@ -146,7 +145,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
if err != nil {
return nil, time.Time{}, err
}
if msg, err = base.DecodeMessage(encoded); err != nil {
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
return nil, time.Time{}, err
}
return msg, time.Unix(d, 0), nil
@@ -615,7 +614,7 @@ func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*bas
return nil, err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
return nil, err
}
@@ -643,14 +642,14 @@ return redis.status_reply("OK")`)
// WriteServerState writes server state data to redis with expiration set to the value ttl.
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
bytes, err := json.Marshal(info)
bytes, err := base.EncodeServerInfo(info)
if err != nil {
return err
}
exp := time.Now().Add(ttl).UTC()
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
for _, w := range workers {
bytes, err := json.Marshal(w)
bytes, err := base.EncodeWorkerInfo(w)
if err != nil {
continue // skip bad data
}
@@ -702,7 +701,7 @@ return redis.status_reply("OK")`)
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
args := []interface{}{ttl.Seconds()}
for _, e := range entries {
bytes, err := json.Marshal(e)
bytes, err := base.EncodeSchedulerEntry(e)
if err != nil {
continue // skip bad data
}
@@ -757,7 +756,7 @@ const maxEvents = 1000
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
key := base.SchedulerHistoryKey(entryID)
data, err := json.Marshal(event)
data, err := base.EncodeSchedulerEnqueueEvent(event)
if err != nil {
return err
}

View File

@@ -101,7 +101,7 @@ func TestEnqueueUnique(t *testing.T) {
m1 := base.TaskMessage{
ID: uuid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": float64(123)},
Payload: map[string]interface{}{"user_id": json.Number("123")},
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}),
}
@@ -1474,7 +1474,7 @@ func TestWriteServerState(t *testing.T) {
Concurrency: 10,
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false,
Started: time.Now(),
Started: time.Now().UTC(),
Status: "running",
ActiveWorkerCount: 0,
}
@@ -1487,12 +1487,11 @@ func TestWriteServerState(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
var got base.ServerInfo
err = json.Unmarshal([]byte(data), &got)
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode json: %v", err)
t.Fatalf("could not decode server info: %v", err)
}
if diff := cmp.Diff(info, got); diff != "" {
if diff := cmp.Diff(info, *got); diff != "" {
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
got, info, diff)
}
@@ -1565,7 +1564,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
Concurrency: 10,
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false,
Started: time.Now().Add(-10 * time.Minute),
Started: time.Now().Add(-10 * time.Minute).UTC(),
Status: "running",
ActiveWorkerCount: len(workers),
}
@@ -1578,12 +1577,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
var got base.ServerInfo
err = json.Unmarshal([]byte(data), &got)
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode json: %v", err)
t.Fatalf("could not decode server info: %v", err)
}
if diff := cmp.Diff(serverInfo, got); diff != "" {
if diff := cmp.Diff(serverInfo, *got); diff != "" {
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
got, serverInfo, diff)
}
@@ -1607,11 +1605,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
}
var gotWorkers []*base.WorkerInfo
for _, val := range wdata {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(val), &w); err != nil {
w, err := base.DecodeWorkerInfo([]byte(val))
if err != nil {
t.Fatalf("could not unmarshal worker's data: %v", err)
}
gotWorkers = append(gotWorkers, &w)
gotWorkers = append(gotWorkers, w)
}
if diff := cmp.Diff(workers, gotWorkers, h.SortWorkerInfoOpt); diff != "" {
t.Errorf("persisted workers info was %v, want %v; (-want,+got)\n%s",