diff --git a/internal/base/base.go b/internal/base/base.go index 4ba67a5..300e930 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -16,6 +16,7 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/golang/protobuf/ptypes" "github.com/google/uuid" pb "github.com/hibiken/asynq/internal/proto" "google.golang.org/protobuf/proto" @@ -203,7 +204,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { if err != nil { return nil, err } - pbmsg := pb.TaskMessage{ + return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, Payload: payload, Id: msg.ID.String(), @@ -214,8 +215,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { Timeout: msg.Timeout, Deadline: msg.Deadline, UniqueKey: msg.UniqueKey, - } - return proto.Marshal(&pbmsg) + }) } // DecodeMessage unmarshals the given bytes and returns a decoded task message. @@ -322,19 +322,130 @@ type ServerInfo struct { ActiveWorkerCount int } +// EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes. +func EncodeServerInfo(info *ServerInfo) ([]byte, error) { + queues := make(map[string]int32) + for q, p := range info.Queues { + queues[q] = int32(p) + } + ts, err := ptypes.TimestampProto(info.Started) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.ServerInfo{ + Host: info.Host, + Pid: int32(info.PID), + ServerId: info.ServerID, + Concurrency: int32(info.Concurrency), + Queues: queues, + StrictPriority: info.StrictPriority, + Status: info.Status, + StartTime: ts, + ActiveWorkerCount: int32(info.ActiveWorkerCount), + }) +} + +// DecodeServerInfo decodes the given bytes into ServerInfo. +func DecodeServerInfo(b []byte) (*ServerInfo, error) { + var pbmsg pb.ServerInfo + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + queues := make(map[string]int) + for q, p := range pbmsg.GetQueues() { + queues[q] = int(p) + } + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + if err != nil { + return nil, err + } + return &ServerInfo{ + Host: pbmsg.Host, + PID: int(pbmsg.Pid), + ServerID: pbmsg.ServerId, + Concurrency: int(pbmsg.Concurrency), + Queues: queues, + StrictPriority: pbmsg.StrictPriority, + Status: pbmsg.Status, + Started: startTime, + ActiveWorkerCount: int(pbmsg.ActiveWorkerCount), + }, nil +} + // WorkerInfo holds information about a running worker. type WorkerInfo struct { Host string PID int ServerID string + // TODO: Can we have Task field (*TaskMessage)? ID string Type string - Queue string Payload map[string]interface{} + Queue string Started time.Time Deadline time.Time } +// EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes. +func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { + payload, err := json.Marshal(info.Payload) + if err != nil { + return nil, err + } + startTime, err := ptypes.TimestampProto(info.Started) + if err != nil { + return nil, err + } + deadline, err := ptypes.TimestampProto(info.Deadline) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.WorkerInfo{ + Host: info.Host, + Pid: int32(info.PID), + ServerId: info.ServerID, + TaskId: info.ID, + TaskType: info.Type, + TaskPayload: payload, + Queue: info.Queue, + StartTime: startTime, + Deadline: deadline, + }) +} + +// DecodeWorkerInfo decodes the given bytes into WorkerInfo. +func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { + var pbmsg pb.WorkerInfo + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + d := json.NewDecoder(bytes.NewReader(pbmsg.GetTaskPayload())) + d.UseNumber() + payload := make(map[string]interface{}) + if err := d.Decode(&payload); err != nil { + return nil, err + } + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + if err != nil { + return nil, err + } + deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) + if err != nil { + return nil, err + } + return &WorkerInfo{ + Host: pbmsg.Host, + PID: int(pbmsg.Pid), + ServerID: pbmsg.ServerId, + ID: pbmsg.TaskId, + Type: pbmsg.TaskType, + Payload: payload, + Queue: pbmsg.Queue, + Started: startTime, + Deadline: deadline, + }, nil +} + // SchedulerEntry holds information about a periodic task registered with a scheduler. type SchedulerEntry struct { // Identifier of this entry. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index f99757f..c779e49 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -372,6 +372,80 @@ func TestMessageEncoding(t *testing.T) { } } +func TestServerInfoEncoding(t *testing.T) { + tests := []struct { + info ServerInfo + }{ + { + info: ServerInfo{ + Host: "127.0.0.1", + PID: 9876, + ServerID: "abc123", + Concurrency: 10, + Queues: map[string]int{"default": 1, "critical": 2}, + StrictPriority: false, + Status: "running", + Started: time.Now().Add(-3 * time.Hour), + ActiveWorkerCount: 8, + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeServerInfo(&tc.info) + if err != nil { + t.Errorf("EncodeServerInfo(info) returned error: %v", err) + continue + } + decoded, err := DecodeServerInfo(encoded) + if err != nil { + t.Errorf("DecodeServerInfo(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.info, decoded); diff != "" { + t.Errorf("Decoded ServerInfo == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.info, diff) + } + } +} + +func TestWorkerInfoEncoding(t *testing.T) { + tests := []struct { + info WorkerInfo + }{ + { + info: WorkerInfo{ + Host: "127.0.0.1", + PID: 9876, + ServerID: "abc123", + ID: uuid.NewString(), + Type: "taskA", + Payload: map[string]interface{}{"foo": "bar"}, + Queue: "default", + Started: time.Now().Add(-3 * time.Hour), + Deadline: time.Now().Add(30 * time.Second), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeWorkerInfo(&tc.info) + if err != nil { + t.Errorf("EncodeWorkerInfo(info) returned error: %v", err) + continue + } + decoded, err := DecodeWorkerInfo(encoded) + if err != nil { + t.Errorf("DecodeWorkerInfo(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.info, decoded); diff != "" { + t.Errorf("Decoded WorkerInfo == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.info, diff) + } + } +} + // Test for status being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestStatusConcurrentAccess(t *testing.T) { diff --git a/internal/proto/asynq.pb.go b/internal/proto/asynq.pb.go index 373b32b..24c9dc8 100644 --- a/internal/proto/asynq.pb.go +++ b/internal/proto/asynq.pb.go @@ -10,6 +10,7 @@ import ( proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -144,30 +145,301 @@ func (x *TaskMessage) GetUniqueKey() string { return "" } +// ServerInfo holds information about a running server. +type ServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"` + Concurrency int32 `protobuf:"varint,4,opt,name=concurrency,proto3" json:"concurrency,omitempty"` + Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"` + Status string `protobuf:"bytes,7,opt,name=status,proto3" json:"status,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` + ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"` +} + +func (x *ServerInfo) Reset() { + *x = ServerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerInfo) ProtoMessage() {} + +func (x *ServerInfo) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerInfo.ProtoReflect.Descriptor instead. +func (*ServerInfo) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{1} +} + +func (x *ServerInfo) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *ServerInfo) GetPid() int32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *ServerInfo) GetServerId() string { + if x != nil { + return x.ServerId + } + return "" +} + +func (x *ServerInfo) GetConcurrency() int32 { + if x != nil { + return x.Concurrency + } + return 0 +} + +func (x *ServerInfo) GetQueues() map[string]int32 { + if x != nil { + return x.Queues + } + return nil +} + +func (x *ServerInfo) GetStrictPriority() bool { + if x != nil { + return x.StrictPriority + } + return false +} + +func (x *ServerInfo) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +func (x *ServerInfo) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *ServerInfo) GetActiveWorkerCount() int32 { + if x != nil { + return x.ActiveWorkerCount + } + return 0 +} + +// WorkerInfo holds information about a running worker. +type WorkerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"` + TaskId string `protobuf:"bytes,4,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` + TaskType string `protobuf:"bytes,5,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"` + TaskPayload []byte `protobuf:"bytes,6,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"` + Queue string `protobuf:"bytes,7,opt,name=queue,proto3" json:"queue,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` + Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"` +} + +func (x *WorkerInfo) Reset() { + *x = WorkerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkerInfo) ProtoMessage() {} + +func (x *WorkerInfo) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead. +func (*WorkerInfo) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{2} +} + +func (x *WorkerInfo) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *WorkerInfo) GetPid() int32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *WorkerInfo) GetServerId() string { + if x != nil { + return x.ServerId + } + return "" +} + +func (x *WorkerInfo) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + +func (x *WorkerInfo) GetTaskType() string { + if x != nil { + return x.TaskType + } + return "" +} + +func (x *WorkerInfo) GetTaskPayload() []byte { + if x != nil { + return x.TaskPayload + } + return nil +} + +func (x *WorkerInfo) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *WorkerInfo) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp { + if x != nil { + return x.Deadline + } + return nil +} + var File_asynq_proto protoreflect.FileDescriptor var file_asynq_proto_rawDesc = []byte{ 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x74, - 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x22, 0x83, 0x02, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, - 0x65, 0x74, 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, - 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, - 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, - 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x12, 0x1d, - 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x42, 0x29, 0x5a, - 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, - 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, - 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x02, 0x0a, 0x0b, 0x54, 0x61, 0x73, + 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, + 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x22, 0x92, + 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, + 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, + 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, + 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x63, 0x79, 0x12, 0x38, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x2e, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, + 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, + 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, + 0x76, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, + 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, + 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, + 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, + 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, + 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, + 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, + 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -182,16 +454,24 @@ func file_asynq_proto_rawDescGZIP() []byte { return file_asynq_proto_rawDescData } -var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_asynq_proto_goTypes = []interface{}{ - (*TaskMessage)(nil), // 0: tutorial.TaskMessage + (*TaskMessage)(nil), // 0: tutorial.TaskMessage + (*ServerInfo)(nil), // 1: tutorial.ServerInfo + (*WorkerInfo)(nil), // 2: tutorial.WorkerInfo + nil, // 3: tutorial.ServerInfo.QueuesEntry + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp } var file_asynq_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 3, // 0: tutorial.ServerInfo.queues:type_name -> tutorial.ServerInfo.QueuesEntry + 4, // 1: tutorial.ServerInfo.start_time:type_name -> google.protobuf.Timestamp + 4, // 2: tutorial.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp + 4, // 3: tutorial.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name } func init() { file_asynq_proto_init() } @@ -212,6 +492,30 @@ func file_asynq_proto_init() { return nil } } + file_asynq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WorkerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -219,7 +523,7 @@ func file_asynq_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_asynq_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/proto/asynq.proto b/internal/proto/asynq.proto index 0c6edf7..a89d6dc 100644 --- a/internal/proto/asynq.proto +++ b/internal/proto/asynq.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package tutorial; -//import "google/protobuf/timestamp.proto"; +import "google/protobuf/timestamp.proto"; option go_package = "github.com/hibiken/asynq/internal/proto"; @@ -25,4 +25,30 @@ message TaskMessage { int64 deadline = 9; string unique_key = 10; +}; + +// ServerInfo holds information about a running server. +message ServerInfo { + string host = 1; + int32 pid = 2; + string server_id = 3; + int32 concurrency = 4; + map queues = 5; + bool strict_priority = 6; + string status = 7; + google.protobuf.Timestamp start_time = 8; + int32 active_worker_count = 9; +}; + +// WorkerInfo holds information about a running worker. +message WorkerInfo { + string host = 1; + int32 pid = 2; + string server_id = 3; + string task_id = 4; + string task_type = 5; + bytes task_payload = 6; + string queue = 7; + google.protobuf.Timestamp start_time = 8; + google.protobuf.Timestamp deadline = 9; }; \ No newline at end of file diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c778219..55c7568 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1013,11 +1013,11 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) { if err != nil { continue // skip bad data } - var info base.ServerInfo - if err := json.Unmarshal([]byte(data), &info); err != nil { + info, err := base.DecodeServerInfo([]byte(data)) + if err != nil { continue // skip bad data } - servers = append(servers, &info) + servers = append(servers, info) } return servers, nil } @@ -1047,11 +1047,11 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { continue // skip bad data } for _, s := range data { - var w base.WorkerInfo - if err := json.Unmarshal([]byte(s), &w); err != nil { + w, err := base.DecodeWorkerInfo([]byte(s)) + if err != nil { continue // skip bad data } - workers = append(workers, &w) + workers = append(workers, w) } } return workers, nil diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 64d5e04..65a735b 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -647,14 +647,14 @@ return redis.status_reply("OK")`) // WriteServerState writes server state data to redis with expiration set to the value ttl. func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { - bytes, err := json.Marshal(info) + bytes, err := base.EncodeServerInfo(info) if err != nil { return err } exp := time.Now().Add(ttl).UTC() args := []interface{}{ttl.Seconds(), bytes} // args to the lua script for _, w := range workers { - bytes, err := json.Marshal(w) + bytes, err := base.EncodeWorkerInfo(w) if err != nil { continue // skip bad data } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 0f4bcfd..23c32ea 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1474,7 +1474,7 @@ func TestWriteServerState(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, - Started: time.Now(), + Started: time.Now().UTC(), Status: "running", ActiveWorkerCount: 0, } @@ -1487,12 +1487,11 @@ func TestWriteServerState(t *testing.T) { // Check ServerInfo was written correctly. skey := base.ServerInfoKey(host, pid, serverID) data := r.client.Get(skey).Val() - var got base.ServerInfo - err = json.Unmarshal([]byte(data), &got) + got, err := base.DecodeServerInfo([]byte(data)) if err != nil { - t.Fatalf("could not decode json: %v", err) + t.Fatalf("could not decode server info: %v", err) } - if diff := cmp.Diff(info, got); diff != "" { + if diff := cmp.Diff(info, *got); diff != "" { t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, info, diff) } @@ -1565,7 +1564,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, - Started: time.Now().Add(-10 * time.Minute), + Started: time.Now().Add(-10 * time.Minute).UTC(), Status: "running", ActiveWorkerCount: len(workers), } @@ -1578,12 +1577,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) { // Check ServerInfo was written correctly. skey := base.ServerInfoKey(host, pid, serverID) data := r.client.Get(skey).Val() - var got base.ServerInfo - err = json.Unmarshal([]byte(data), &got) + got, err := base.DecodeServerInfo([]byte(data)) if err != nil { - t.Fatalf("could not decode json: %v", err) + t.Fatalf("could not decode server info: %v", err) } - if diff := cmp.Diff(serverInfo, got); diff != "" { + if diff := cmp.Diff(serverInfo, *got); diff != "" { t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, serverInfo, diff) } @@ -1607,11 +1605,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) { } var gotWorkers []*base.WorkerInfo for _, val := range wdata { - var w base.WorkerInfo - if err := json.Unmarshal([]byte(val), &w); err != nil { + w, err := base.DecodeWorkerInfo([]byte(val)) + if err != nil { t.Fatalf("could not unmarshal worker's data: %v", err) } - gotWorkers = append(gotWorkers, &w) + gotWorkers = append(gotWorkers, w) } if diff := cmp.Diff(workers, gotWorkers, h.SortWorkerInfoOpt); diff != "" { t.Errorf("persisted workers info was %v, want %v; (-want,+got)\n%s",