diff --git a/internal/base/base.go b/internal/base/base.go index 300e930..e18ac38 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -224,10 +224,8 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { if err := proto.Unmarshal(data, &pbmsg); err != nil { return nil, err } - d := json.NewDecoder(bytes.NewReader(pbmsg.Payload)) - d.UseNumber() - payload := make(map[string]interface{}) - if err := d.Decode(&payload); err != nil { + payload, err := decodePayload(pbmsg.GetPayload()) + if err != nil { return nil, err } return &TaskMessage{ @@ -413,16 +411,24 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { }) } +func decodePayload(b []byte) (map[string]interface{}, error) { + d := json.NewDecoder(bytes.NewReader(b)) + d.UseNumber() + payload := make(map[string]interface{}) + if err := d.Decode(&payload); err != nil { + return nil, err + } + return payload, nil +} + // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { var pbmsg pb.WorkerInfo if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - d := json.NewDecoder(bytes.NewReader(pbmsg.GetTaskPayload())) - d.UseNumber() - payload := make(map[string]interface{}) - if err := d.Decode(&payload); err != nil { + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { return nil, err } startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) @@ -471,6 +477,60 @@ type SchedulerEntry struct { Prev time.Time } +// EncodeSchedulerEntry marshals the given entry and returns an encoded bytes. +func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { + payload, err := json.Marshal(entry.Payload) + if err != nil { + return nil, err + } + next, err := ptypes.TimestampProto(entry.Next) + if err != nil { + return nil, err + } + prev, err := ptypes.TimestampProto(entry.Prev) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.SchedulerEntry{ + Id: entry.ID, + Spec: entry.Spec, + TaskType: entry.Type, + TaskPayload: payload, + EnqueueOptions: entry.Opts, + NextEnqueueTime: next, + PrevEnqueueTime: prev, + }) +} + +// DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry. +func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { + var pbmsg pb.SchedulerEntry + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { + return nil, err + } + next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) + if err != nil { + return nil, err + } + prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) + if err != nil { + return nil, err + } + return &SchedulerEntry{ + ID: pbmsg.Id, + Spec: pbmsg.Spec, + Type: pbmsg.TaskType, + Payload: payload, + Opts: pbmsg.EnqueueOptions, + Next: next, + Prev: prev, + }, nil +} + // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. @@ -480,6 +540,36 @@ type SchedulerEnqueueEvent struct { EnqueuedAt time.Time } +// EncodeSchedulerEnqueueEvent marshals the given event +// and returns an encoded bytes. +func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) { + enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.SchedulerEnqueueEvent{ + TaskId: event.TaskID, + EnqueueTime: enqueuedAt, + }) +} + +// DecodeSchedulerEnqueueEvent unmarshals the given bytes +// and returns a decoded SchedulerEnqueueEvent. +func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) { + var pbmsg pb.SchedulerEnqueueEvent + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) + if err != nil { + return nil, err + } + return &SchedulerEnqueueEvent{ + TaskID: pbmsg.TaskId, + EnqueuedAt: enqueuedAt, + }, nil +} + // Cancelations is a collection that holds cancel functions for all active tasks. // // Cancelations are safe for concurrent use by multipel goroutines. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index c779e49..bfebef5 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -446,6 +446,71 @@ func TestWorkerInfoEncoding(t *testing.T) { } } +func TestSchedulerEntryEncoding(t *testing.T) { + tests := []struct { + entry SchedulerEntry + }{ + { + entry: SchedulerEntry{ + ID: uuid.NewString(), + Spec: "* * * * *", + Type: "task_A", + Payload: map[string]interface{}{"foo": "bar"}, + Opts: []string{"Queue('email')"}, + Next: time.Now().Add(30 * time.Second).UTC(), + Prev: time.Now().Add(-2 * time.Minute).UTC(), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeSchedulerEntry(&tc.entry) + if err != nil { + t.Errorf("EncodeSchedulerEntry(entry) returned error: %v", err) + continue + } + decoded, err := DecodeSchedulerEntry(encoded) + if err != nil { + t.Errorf("DecodeSchedulerEntry(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.entry, decoded); diff != "" { + t.Errorf("Decoded SchedulerEntry == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.entry, diff) + } + } +} + +func TestSchedulerEnqueueEventEncoding(t *testing.T) { + tests := []struct { + event SchedulerEnqueueEvent + }{ + { + event: SchedulerEnqueueEvent{ + TaskID: uuid.NewString(), + EnqueuedAt: time.Now().Add(-30 * time.Second).UTC(), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeSchedulerEnqueueEvent(&tc.event) + if err != nil { + t.Errorf("EncodeSchedulerEnqueueEvent(event) returned error: %v", err) + continue + } + decoded, err := DecodeSchedulerEnqueueEvent(encoded) + if err != nil { + t.Errorf("DecodeSchedulerEnqueueEvent(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.event, decoded); diff != "" { + t.Errorf("Decoded SchedulerEnqueueEvent == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.event, diff) + } + } +} + // Test for status being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestStatusConcurrentAccess(t *testing.T) { diff --git a/internal/proto/asynq.pb.go b/internal/proto/asynq.pb.go index 24c9dc8..fba1820 100644 --- a/internal/proto/asynq.pb.go +++ b/internal/proto/asynq.pb.go @@ -369,6 +369,168 @@ func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp { return nil } +// SchedulerEntry holds information about a periodic task registered with a scheduler. +type SchedulerEntry struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Identifier of the scheduler entry. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Periodic schedule spec of the entry. + Spec string `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` + // Task type of the periodic task. + TaskType string `protobuf:"bytes,3,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"` + // Task payload of the periodic task. + TaskPayload []byte `protobuf:"bytes,4,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"` + // Options used to enqueue the periodic task. + EnqueueOptions []string `protobuf:"bytes,5,rep,name=enqueue_options,json=enqueueOptions,proto3" json:"enqueue_options,omitempty"` + // Next time the task will be enqueued. + NextEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=next_enqueue_time,json=nextEnqueueTime,proto3" json:"next_enqueue_time,omitempty"` + // Last time the task was enqueued. + // Zero time if task was never enqueued. + PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"` +} + +func (x *SchedulerEntry) Reset() { + *x = SchedulerEntry{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SchedulerEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SchedulerEntry) ProtoMessage() {} + +func (x *SchedulerEntry) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SchedulerEntry.ProtoReflect.Descriptor instead. +func (*SchedulerEntry) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{3} +} + +func (x *SchedulerEntry) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SchedulerEntry) GetSpec() string { + if x != nil { + return x.Spec + } + return "" +} + +func (x *SchedulerEntry) GetTaskType() string { + if x != nil { + return x.TaskType + } + return "" +} + +func (x *SchedulerEntry) GetTaskPayload() []byte { + if x != nil { + return x.TaskPayload + } + return nil +} + +func (x *SchedulerEntry) GetEnqueueOptions() []string { + if x != nil { + return x.EnqueueOptions + } + return nil +} + +func (x *SchedulerEntry) GetNextEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.NextEnqueueTime + } + return nil +} + +func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.PrevEnqueueTime + } + return nil +} + +// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. +type SchedulerEnqueueEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // ID of the task that was enqueued. + TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` + // Time the task was enqueued. + EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"` +} + +func (x *SchedulerEnqueueEvent) Reset() { + *x = SchedulerEnqueueEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SchedulerEnqueueEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SchedulerEnqueueEvent) ProtoMessage() {} + +func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SchedulerEnqueueEvent.ProtoReflect.Descriptor instead. +func (*SchedulerEnqueueEvent) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{4} +} + +func (x *SchedulerEnqueueEvent) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + +func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.EnqueueTime + } + return nil +} + var File_asynq_proto protoreflect.FileDescriptor var file_asynq_proto_rawDesc = []byte{ @@ -436,10 +598,36 @@ var file_asynq_proto_rawDesc = []byte{ 0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, - 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, - 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, + 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, + 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, + 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, + 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, + 0x0a, 0x0f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, + 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, + 0x6e, 0x65, 0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x46, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, + 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, + 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, + 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, + 0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -454,24 +642,29 @@ func file_asynq_proto_rawDescGZIP() []byte { return file_asynq_proto_rawDescData } -var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_asynq_proto_goTypes = []interface{}{ (*TaskMessage)(nil), // 0: tutorial.TaskMessage (*ServerInfo)(nil), // 1: tutorial.ServerInfo (*WorkerInfo)(nil), // 2: tutorial.WorkerInfo - nil, // 3: tutorial.ServerInfo.QueuesEntry - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (*SchedulerEntry)(nil), // 3: tutorial.SchedulerEntry + (*SchedulerEnqueueEvent)(nil), // 4: tutorial.SchedulerEnqueueEvent + nil, // 5: tutorial.ServerInfo.QueuesEntry + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp } var file_asynq_proto_depIdxs = []int32{ - 3, // 0: tutorial.ServerInfo.queues:type_name -> tutorial.ServerInfo.QueuesEntry - 4, // 1: tutorial.ServerInfo.start_time:type_name -> google.protobuf.Timestamp - 4, // 2: tutorial.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp - 4, // 3: tutorial.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 5, // 0: tutorial.ServerInfo.queues:type_name -> tutorial.ServerInfo.QueuesEntry + 6, // 1: tutorial.ServerInfo.start_time:type_name -> google.protobuf.Timestamp + 6, // 2: tutorial.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp + 6, // 3: tutorial.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp + 6, // 4: tutorial.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp + 6, // 5: tutorial.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp + 6, // 6: tutorial.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_asynq_proto_init() } @@ -516,6 +709,30 @@ func file_asynq_proto_init() { return nil } } + file_asynq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SchedulerEntry); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SchedulerEnqueueEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -523,7 +740,7 @@ func file_asynq_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_asynq_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/proto/asynq.proto b/internal/proto/asynq.proto index a89d6dc..199c9a1 100644 --- a/internal/proto/asynq.proto +++ b/internal/proto/asynq.proto @@ -1,10 +1,12 @@ syntax = "proto3"; -package tutorial; +package asynq; import "google/protobuf/timestamp.proto"; option go_package = "github.com/hibiken/asynq/internal/proto"; +// TaskMessage is the internal representation of a task with additional metadata fields. +// See base.TaskMessage for details of each field. message TaskMessage { string type = 1; @@ -51,4 +53,38 @@ message WorkerInfo { string queue = 7; google.protobuf.Timestamp start_time = 8; google.protobuf.Timestamp deadline = 9; +}; + +// SchedulerEntry holds information about a periodic task registered with a scheduler. +message SchedulerEntry { + // Identifier of the scheduler entry. + string id = 1; + + // Periodic schedule spec of the entry. + string spec = 2; + + // Task type of the periodic task. + string task_type = 3; + + // Task payload of the periodic task. + bytes task_payload = 4; + + // Options used to enqueue the periodic task. + repeated string enqueue_options = 5; + + // Next time the task will be enqueued. + google.protobuf.Timestamp next_enqueue_time = 6; + + // Last time the task was enqueued. + // Zero time if task was never enqueued. + google.protobuf.Timestamp prev_enqueue_time = 7; +}; + +// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. +message SchedulerEnqueueEvent { + // ID of the task that was enqueued. + string task_id = 1; + + // Time the task was enqueued. + google.protobuf.Timestamp enqueue_time = 2; }; \ No newline at end of file diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 55c7568..96cfa55 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -5,7 +5,6 @@ package rdb import ( - "encoding/json" "fmt" "strings" "time" @@ -1082,11 +1081,11 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { continue // skip bad data } for _, s := range data { - var e base.SchedulerEntry - if err := json.Unmarshal([]byte(s), &e); err != nil { + e, err := base.DecodeSchedulerEntry([]byte(s)) + if err != nil { continue // skip bad data } - entries = append(entries, &e) + entries = append(entries, e) } } return entries, nil @@ -1105,11 +1104,11 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas if err != nil { return nil, err } - var e base.SchedulerEnqueueEvent - if err := json.Unmarshal([]byte(data), &e); err != nil { + e, err := base.DecodeSchedulerEnqueueEvent([]byte(data)) + if err != nil { return nil, err } - events = append(events, &e) + events = append(events, e) } return events, nil } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 65a735b..c3dce7e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -6,7 +6,6 @@ package rdb import ( - "encoding/json" "errors" "fmt" "time" @@ -706,7 +705,7 @@ return redis.status_reply("OK")`) func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error { args := []interface{}{ttl.Seconds()} for _, e := range entries { - bytes, err := json.Marshal(e) + bytes, err := base.EncodeSchedulerEntry(e) if err != nil { continue // skip bad data } @@ -761,7 +760,7 @@ const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { key := base.SchedulerHistoryKey(entryID) - data, err := json.Marshal(event) + data, err := base.EncodeSchedulerEnqueueEvent(event) if err != nil { return err }