diff --git a/aggregator.go b/aggregator.go index 9f8da70..3db9d2e 100644 --- a/aggregator.go +++ b/aggregator.go @@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) { } tasks := make([]*Task, len(msgs)) for i, m := range msgs { - tasks[i] = NewTask(m.Type, m.Payload) + tasks[i] = NewTask(m.Type, m.Payload, nil) } aggregatedTask := a.ga.Aggregate(gname, tasks) ctx, cancel := context.WithDeadline(context.Background(), deadline) diff --git a/aggregator_test.go b/aggregator_test.go index ccce306..b3cdcc1 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -39,12 +39,12 @@ func TestAggregator(t *testing.T) { maxDelay: 0, // no maxdelay limit maxSize: 0, // no maxsize limit aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated }, tasks: []*Task{ - NewTask("task1", nil, Group("mygroup")), - NewTask("task2", nil, Group("mygroup")), - NewTask("task3", nil, Group("mygroup")), + NewTask("task1", nil, nil, Group("mygroup")), + NewTask("task2", nil, nil, Group("mygroup")), + NewTask("task3", nil, nil, Group("mygroup")), }, enqueueFrequency: 300 * time.Millisecond, waitTime: 3 * time.Second, @@ -65,13 +65,13 @@ func TestAggregator(t *testing.T) { maxDelay: 4 * time.Second, maxSize: 0, // no maxsize limit aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated }, tasks: []*Task{ - NewTask("task1", nil, Group("mygroup")), // time 0 - NewTask("task2", nil, Group("mygroup")), // time 1s - NewTask("task3", nil, Group("mygroup")), // time 2s - NewTask("task4", nil, Group("mygroup")), // time 3s + NewTask("task1", nil, nil, Group("mygroup")), // time 0 + NewTask("task2", nil, nil, Group("mygroup")), // time 1s + NewTask("task3", nil, nil, Group("mygroup")), // time 2s + NewTask("task4", nil, nil, Group("mygroup")), // time 3s }, enqueueFrequency: 1 * time.Second, waitTime: 4 * time.Second, @@ -92,14 +92,14 @@ func TestAggregator(t *testing.T) { maxDelay: 0, // no maxdelay limit maxSize: 5, aggregateFunc: func(gname string, tasks []*Task) *Task { - return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated + return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated }, tasks: []*Task{ - NewTask("task1", nil, Group("mygroup")), - NewTask("task2", nil, Group("mygroup")), - NewTask("task3", nil, Group("mygroup")), - NewTask("task4", nil, Group("mygroup")), - NewTask("task5", nil, Group("mygroup")), + NewTask("task1", nil, nil, Group("mygroup")), + NewTask("task2", nil, nil, Group("mygroup")), + NewTask("task3", nil, nil, Group("mygroup")), + NewTask("task4", nil, nil, Group("mygroup")), + NewTask("task5", nil, nil, Group("mygroup")), }, enqueueFrequency: 300 * time.Millisecond, waitTime: defaultAggregationCheckInterval * 2, diff --git a/asynq.go b/asynq.go index 4c65831..d3479ab 100644 --- a/asynq.go +++ b/asynq.go @@ -14,8 +14,8 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" + "github.com/redis/go-redis/v9" ) // Task represents a unit of work to be performed. @@ -31,10 +31,14 @@ type Task struct { // w is the ResultWriter for the task. w *ResultWriter + + // md holds metadata of the task. + md Metadata } -func (t *Task) Type() string { return t.typename } -func (t *Task) Payload() []byte { return t.payload } +func (t *Task) Type() string { return t.typename } +func (t *Task) Payload() []byte { return t.payload } +func (t *Task) Metadata() map[string]string { return t.md } // ResultWriter returns a pointer to the ResultWriter associated with the task. // @@ -44,19 +48,21 @@ func (t *Task) ResultWriter() *ResultWriter { return t.w } // NewTask returns a new Task given a type name and payload data. // Options can be passed to configure task processing behavior. -func NewTask(typename string, payload []byte, opts ...Option) *Task { +func NewTask(typename string, payload []byte, md Metadata, opts ...Option) *Task { return &Task{ typename: typename, payload: payload, + md: md, opts: opts, } } // newTask creates a task with the given typename, payload and ResultWriter. -func newTask(typename string, payload []byte, w *ResultWriter) *Task { +func newTask(typename string, payload []byte, md Metadata, w *ResultWriter) *Task { return &Task{ typename: typename, payload: payload, + md: md, w: w, } } @@ -438,10 +444,11 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} { // // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Supported formats are: -// redis://[:password@]host[:port][/dbnumber] -// rediss://[:password@]host[:port][/dbnumber] -// redis-socket://[:password@]path[?db=dbnumber] -// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] +// +// redis://[:password@]host[:port][/dbnumber] +// rediss://[:password@]host[:port][/dbnumber] +// redis-socket://[:password@]path[?db=dbnumber] +// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName] func ParseRedisURI(uri string) (RedisConnOpt, error) { u, err := url.Parse(uri) if err != nil { diff --git a/benchmark_test.go b/benchmark_test.go index dfae16a..528c9d3 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -21,7 +21,7 @@ func makeTask(n int) *Task { if err != nil { panic(err) } - return NewTask(fmt.Sprintf("task%d", n), b) + return NewTask(fmt.Sprintf("task%d", n), b, nil) } // Simple E2E Benchmark testing with no scheduled tasks and retries. @@ -222,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.Log("Starting enqueueing") enqueued := 0 for enqueued < 100000 { - t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) + t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}), nil) if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue diff --git a/client.go b/client.go index 8f2f92c..4524b5d 100644 --- a/client.go +++ b/client.go @@ -10,11 +10,11 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // A Client is responsible for scheduling tasks. @@ -150,9 +150,9 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } // TTL duration must be greater than or equal to 1 second. // // Uniqueness of a task is based on the following properties: -// - Task Type -// - Task Payload -// - Queue Name +// - Task Type +// - Task Payload +// - Queue Name func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } @@ -369,6 +369,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) } msg := &base.TaskMessage{ ID: opt.taskID, + Metadata: task.Metadata(), Type: task.Type(), Payload: task.Payload(), Queue: opt.queue, diff --git a/client_test.go b/client_test.go index da24d13..03208a7 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil) var ( now = time.Now() @@ -148,7 +148,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil) now := time.Now() tests := []struct { @@ -483,7 +483,7 @@ func TestClientEnqueueWithGroupOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("mytask", []byte("foo")) + task := NewTask("mytask", []byte("foo"), nil) now := time.Now() tests := []struct { @@ -635,7 +635,7 @@ func TestClientEnqueueWithTaskIDOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", nil) + task := NewTask("send_email", nil, nil) now := time.Now() tests := []struct { @@ -713,7 +713,7 @@ func TestClientEnqueueWithConflictingTaskID(t *testing.T) { defer client.Close() const taskID = "custom_id" - task := NewTask("foo", nil) + task := NewTask("foo", nil, nil) if _, err := client.Enqueue(task, TaskID(taskID)); err != nil { t.Fatalf("First task: Enqueue failed: %v", err) @@ -729,7 +729,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil) now := time.Now() tests := []struct { @@ -852,7 +852,7 @@ func TestClientEnqueueError(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil) tests := []struct { desc string @@ -873,27 +873,27 @@ func TestClientEnqueueError(t *testing.T) { }, { desc: "With empty task typename", - task: NewTask("", h.JSON(map[string]interface{}{})), + task: NewTask("", h.JSON(map[string]interface{}{}), nil), opts: []Option{}, }, { desc: "With blank task typename", - task: NewTask(" ", h.JSON(map[string]interface{}{})), + task: NewTask(" ", h.JSON(map[string]interface{}{}), nil), opts: []Option{}, }, { desc: "With empty task ID", - task: NewTask("foo", nil), + task: NewTask("foo", nil, nil), opts: []Option{TaskID("")}, }, { desc: "With blank task ID", - task: NewTask("foo", nil), + task: NewTask("foo", nil, nil), opts: []Option{TaskID(" ")}, }, { desc: "With unique option less than 1s", - task: NewTask("foo", nil), + task: NewTask("foo", nil, nil), opts: []Option{Unique(300 * time.Millisecond)}, }, } @@ -1015,7 +1015,7 @@ func TestClientWithDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) defer c.Close() - task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...) + task := NewTask(tc.tasktype, tc.payload, nil, tc.defaultOpts...) gotInfo, err := c.Enqueue(task, tc.opts...) if err != nil { t.Fatal(err) @@ -1052,7 +1052,7 @@ func TestClientEnqueueUnique(t *testing.T) { ttl time.Duration }{ { - NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), + NewTask("email", h.JSON(map[string]interface{}{"user_id": 123}), nil), time.Hour, }, } @@ -1096,7 +1096,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { ttl time.Duration }{ { - NewTask("reindex", nil), + NewTask("reindex", nil, nil), time.Hour, 10 * time.Minute, }, @@ -1142,7 +1142,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { ttl time.Duration }{ { - NewTask("reindex", nil), + NewTask("reindex", nil, nil), time.Now().Add(time.Hour), 10 * time.Minute, }, diff --git a/example_test.go b/example_test.go index 333236d..c5e6080 100644 --- a/example_test.go +++ b/example_test.go @@ -86,10 +86,10 @@ func ExampleScheduler() { &asynq.SchedulerOpts{Location: time.Local}, ) - if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { + if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil, nil)); err != nil { log.Fatal(err) } - if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { + if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil, nil)); err != nil { log.Fatal(err) } diff --git a/go.mod b/go.mod index 9730073..5df1542 100644 --- a/go.mod +++ b/go.mod @@ -18,4 +18,5 @@ require ( require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/stretchr/testify v1.8.4 // indirect ) diff --git a/go.sum b/go.sum index eab35c3..cd137f1 100644 --- a/go.sum +++ b/go.sum @@ -24,7 +24,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= diff --git a/inspector.go b/inspector.go index ee99f52..69cc6b8 100644 --- a/inspector.go +++ b/inspector.go @@ -10,10 +10,10 @@ import ( "strings" "time" - "github.com/redis/go-redis/v9" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" + "github.com/redis/go-redis/v9" ) // Inspector is a client interface to inspect and mutate the state of @@ -890,7 +890,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { return nil, err } for _, e := range res { - task := NewTask(e.Type, e.Payload) + task := NewTask(e.Type, e.Payload, nil) var opts []Option for _, s := range e.Opts { if o, err := parseOption(s); err == nil { diff --git a/inspector_test.go b/inspector_test.go index 0d47d06..fdba3d2 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -3324,14 +3324,14 @@ func TestInspectorSchedulerEntries(t *testing.T) { want: []*SchedulerEntry{ { Spec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: nil, Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, { Spec: "@every 20m", - Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), + Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"}), nil), Opts: []Option{Queue("bar"), MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/internal/base/base.go b/internal/base/base.go index a63c548..fc0ff3d 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -234,6 +234,9 @@ func AllAggregationSets(qname string) string { // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written to redis. type TaskMessage struct { + // Metadata holds metadata of the task. + Metadata map[string]string + // Type indicates the kind of the task to be performed. Type string @@ -302,6 +305,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { return nil, fmt.Errorf("cannot encode nil message") } return proto.Marshal(&pb.TaskMessage{ + Metadata: msg.Metadata, Type: msg.Type, Payload: msg.Payload, Id: msg.ID, @@ -326,6 +330,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { return nil, err } return &TaskMessage{ + Metadata: pbmsg.GetMetadata(), Type: pbmsg.GetType(), Payload: pbmsg.GetPayload(), ID: pbmsg.GetId(), diff --git a/internal/proto/asynq.pb.go b/internal/proto/asynq.pb.go index ec14b8c..e951ccb 100644 --- a/internal/proto/asynq.pb.go +++ b/internal/proto/asynq.pb.go @@ -4,14 +4,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0 -// protoc v3.17.3 +// protoc-gen-go v1.33.0 +// protoc v4.25.3 // source: asynq.proto package proto import ( - proto "github.com/golang/protobuf/proto" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -26,10 +25,6 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// This is a compile-time assertion that a sufficiently up-to-date version -// of the legacy proto package is being used. -const _ = proto.ProtoPackageIsVersion4 - // TaskMessage is the internal representation of a task with additional // metadata fields. // Next ID: 15 @@ -38,6 +33,9 @@ type TaskMessage struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + // Metadata holds additional data for a task. + // This field can be used to carry cross-cutting concerns, such as a Trace ID. + Metadata map[string]string `protobuf:"bytes,15,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // Type indicates the kind of the task to be performed. Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` // Payload holds data needed to process the task. @@ -111,6 +109,13 @@ func (*TaskMessage) Descriptor() ([]byte, []int) { return file_asynq_proto_rawDescGZIP(), []int{0} } +func (x *TaskMessage) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + func (x *TaskMessage) GetType() string { if x != nil { return x.Type @@ -625,105 +630,113 @@ var file_asynq_proto_rawDesc = []byte{ 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, - 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, - 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, - 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, - 0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, - 0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, - 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, - 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, - 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, - 0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, - 0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c, - 0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, - 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, - 0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, - 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, - 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, - 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, - 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, - 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, - 0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, - 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, - 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72, - 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73, - 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, - 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, - 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20, - 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, - 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, - 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, - 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, - 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, - 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, - 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, - 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, - 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61, - 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, - 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09, - 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, - 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f, - 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e, - 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65, - 0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a, - 0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, - 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x82, 0x04, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, + 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, + 0x61, 0x18, 0x0f, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, + 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a, + 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, + 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, 0x61, 0x69, + 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6c, 0x61, + 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, + 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, + 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, + 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x12, + 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, + 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6f, + 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x1a, 0x3b, 0x0a, + 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8f, 0x03, 0x0a, 0x0a, 0x53, + 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, + 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, + 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, + 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x35, + 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, + 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x5f, + 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, + 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x16, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, + 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, + 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b, + 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, + 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, + 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xb1, 0x02, 0x0a, + 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, + 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, + 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, + 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x17, + 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, + 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x39, 0x0a, + 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, + 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, + 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6e, 0x71, 0x75, 0x65, + 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, + 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x45, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76, + 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, + 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, + 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, + 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, - 0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, - 0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, - 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65, - 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79, - 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, + 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, + 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -738,29 +751,31 @@ func file_asynq_proto_rawDescGZIP() []byte { return file_asynq_proto_rawDescData } -var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_asynq_proto_goTypes = []interface{}{ (*TaskMessage)(nil), // 0: asynq.TaskMessage (*ServerInfo)(nil), // 1: asynq.ServerInfo (*WorkerInfo)(nil), // 2: asynq.WorkerInfo (*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry (*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent - nil, // 5: asynq.ServerInfo.QueuesEntry - (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp + nil, // 5: asynq.TaskMessage.MetadataEntry + nil, // 6: asynq.ServerInfo.QueuesEntry + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp } var file_asynq_proto_depIdxs = []int32{ - 5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry - 6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp - 6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp - 6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp - 6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp - 6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp - 6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp - 7, // [7:7] is the sub-list for method output_type - 7, // [7:7] is the sub-list for method input_type - 7, // [7:7] is the sub-list for extension type_name - 7, // [7:7] is the sub-list for extension extendee - 0, // [0:7] is the sub-list for field type_name + 5, // 0: asynq.TaskMessage.metadata:type_name -> asynq.TaskMessage.MetadataEntry + 6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry + 7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp + 7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp + 7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp + 7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp + 7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp + 7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_asynq_proto_init() } @@ -836,7 +851,7 @@ func file_asynq_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_asynq_proto_rawDesc, NumEnums: 0, - NumMessages: 6, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/internal/proto/asynq.proto b/internal/proto/asynq.proto index 777aa98..f61e132 100644 --- a/internal/proto/asynq.proto +++ b/internal/proto/asynq.proto @@ -13,6 +13,10 @@ option go_package = "github.com/hibiken/asynq/internal/proto"; // metadata fields. // Next ID: 15 message TaskMessage { + // Metadata holds additional data for a task. + // This field can be used to carry cross-cutting concerns, such as a Trace ID. + map metadata = 15; + // Type indicates the kind of the task to be performed. string type = 1; diff --git a/metadata.go b/metadata.go new file mode 100644 index 0000000..3a1c8bb --- /dev/null +++ b/metadata.go @@ -0,0 +1,27 @@ +package asynq + +type Metadata map[string]string + +func (a Metadata) Get(key string) string { + v, ok := a[key] + if !ok { + return "" + } + return v +} + +func (a Metadata) Set(key string, value string) { + a[key] = value +} + +func (a Metadata) Keys() []string { + i := 0 + r := make([]string, len(a)) + + for k := range a { + r[i] = k + i++ + } + + return r +} diff --git a/periodic_task_manager_test.go b/periodic_task_manager_test.go index 24d5e5b..221cbe3 100644 --- a/periodic_task_manager_test.go +++ b/periodic_task_manager_test.go @@ -33,8 +33,8 @@ func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) { func TestNewPeriodicTaskManager(t *testing.T) { cfgs := []*PeriodicTaskConfig{ - {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, - {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + {Cronspec: "* * * * *", Task: NewTask("foo", nil, nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", nil, nil)}, } tests := []struct { desc string @@ -78,8 +78,8 @@ func TestNewPeriodicTaskManager(t *testing.T) { func TestNewPeriodicTaskManagerError(t *testing.T) { cfgs := []*PeriodicTaskConfig{ - {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, - {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + {Cronspec: "* * * * *", Task: NewTask("foo", nil, nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", nil, nil)}, } tests := []struct { desc string @@ -118,11 +118,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "basic identity test", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), }, isSame: true, }, @@ -130,12 +130,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with a option", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue")}, }, isSame: true, @@ -144,12 +144,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with multiple options (different order)", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)}, }, isSame: true, @@ -158,12 +158,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with payload", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello world!")), + Task: NewTask("foo", []byte("hello world!"), nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello world!")), + Task: NewTask("foo", []byte("hello world!"), nil), Opts: []Option{Queue("myqueue")}, }, isSame: true, @@ -172,11 +172,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different cronspecs", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), }, b: &PeriodicTaskConfig{ Cronspec: "5 * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), }, isSame: false, }, @@ -184,11 +184,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different task type", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("bar", nil), + Task: NewTask("bar", nil, nil), }, isSame: false, }, @@ -196,12 +196,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different options", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Unique(10 * time.Minute)}, }, isSame: false, @@ -210,12 +210,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different options (one is subset of the other)", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", nil, nil), Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, }, isSame: false, @@ -224,12 +224,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different payload", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello!")), + Task: NewTask("foo", []byte("hello!"), nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("HELLO!")), + Task: NewTask("foo", []byte("HELLO!"), nil), Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, }, isSame: false, @@ -255,8 +255,8 @@ func TestPeriodicTaskConfigHash(t *testing.T) { func TestPeriodicTaskManager(t *testing.T) { // Note: In this test, we'll use task type as an ID for each config. cfgs := []*PeriodicTaskConfig{ - {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, - {Task: NewTask("task2", nil), Cronspec: "* * * * 2"}, + {Task: NewTask("task1", nil, nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task2", nil, nil), Cronspec: "* * * * 2"}, } const syncInterval = 3 * time.Second provider := &FakeConfigProvider{cfgs: cfgs} @@ -287,8 +287,8 @@ func TestPeriodicTaskManager(t *testing.T) { // - task2 removed // - task3 added provider.SetConfigs([]*PeriodicTaskConfig{ - {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, - {Task: NewTask("task3", nil), Cronspec: "* * * * 3"}, + {Task: NewTask("task1", nil, nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task3", nil, nil), Cronspec: "* * * * 3"}, }) // Wait for the next sync diff --git a/processor.go b/processor.go index 0ba9890..844820b 100644 --- a/processor.go +++ b/processor.go @@ -222,6 +222,7 @@ func (p *processor) exec() { task := newTask( msg.Type, msg.Payload, + msg.Metadata, &ResultWriter{ id: msg.ID, qname: msg.Queue, @@ -325,7 +326,7 @@ var SkipRetry = errors.New("skip retry for the task") func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { - p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) + p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload, nil), err) } if !p.isFailureFunc(err) { // retry the task without marking it as failed @@ -346,7 +347,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu return } ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) - d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) + d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload, nil)) retryAt := time.Now().Add(d) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) if err != nil { diff --git a/processor_test.go b/processor_test.go index 9be4729..440be86 100644 --- a/processor_test.go +++ b/processor_test.go @@ -93,10 +93,10 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { m3 := h.NewTaskMessage("task3", nil) m4 := h.NewTaskMessage("task4", nil) - t1 := NewTask(m1.Type, m1.Payload) - t2 := NewTask(m2.Type, m2.Payload) - t3 := NewTask(m3.Type, m3.Payload) - t4 := NewTask(m4.Type, m4.Payload) + t1 := NewTask(m1.Type, m1.Payload, nil) + t2 := NewTask(m2.Type, m2.Payload, nil) + t3 := NewTask(m3.Type, m3.Payload, nil) + t4 := NewTask(m4.Type, m4.Payload, nil) tests := []struct { pending []*base.TaskMessage // initial default queue state @@ -162,10 +162,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { m3 = h.NewTaskMessageWithQueue("task3", nil, "high") m4 = h.NewTaskMessageWithQueue("task4", nil, "low") - t1 = NewTask(m1.Type, m1.Payload) - t2 = NewTask(m2.Type, m2.Payload) - t3 = NewTask(m3.Type, m3.Payload) - t4 = NewTask(m4.Type, m4.Payload) + t1 = NewTask(m1.Type, m1.Payload, nil) + t2 = NewTask(m2.Type, m2.Payload, nil) + t3 = NewTask(m3.Type, m3.Payload, nil) + t4 = NewTask(m4.Type, m4.Payload, nil) ) defer r.Close() @@ -232,7 +232,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { rdbClient := rdb.NewRDB(r) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) - t1 := NewTask(m1.Type, m1.Payload) + t1 := NewTask(m1.Type, m1.Payload, nil) tests := []struct { pending []*base.TaskMessage // initial default queue state @@ -639,13 +639,13 @@ func TestProcessorWithStrictPriority(t *testing.T) { m6 = h.NewTaskMessageWithQueue("task6", nil, "low") m7 = h.NewTaskMessageWithQueue("task7", nil, "low") - t1 = NewTask(m1.Type, m1.Payload) - t2 = NewTask(m2.Type, m2.Payload) - t3 = NewTask(m3.Type, m3.Payload) - t4 = NewTask(m4.Type, m4.Payload) - t5 = NewTask(m5.Type, m5.Payload) - t6 = NewTask(m6.Type, m6.Payload) - t7 = NewTask(m7.Type, m7.Payload) + t1 = NewTask(m1.Type, m1.Payload, nil) + t2 = NewTask(m2.Type, m2.Payload, nil) + t3 = NewTask(m3.Type, m3.Payload, nil) + t4 = NewTask(m4.Type, m4.Payload, nil) + t5 = NewTask(m5.Type, m5.Payload, nil) + t6 = NewTask(m6.Type, m6.Payload, nil) + t7 = NewTask(m7.Type, m7.Payload, nil) ) defer r.Close() @@ -742,7 +742,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return nil }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil), wantErr: false, }, { @@ -750,7 +750,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil), wantErr: true, }, { @@ -758,7 +758,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil), wantErr: true, }, } @@ -928,7 +928,7 @@ func TestProcessorComputeDeadline(t *testing.T) { func TestReturnPanicError(t *testing.T) { - task := NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})) + task := NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil) tests := []struct { name string diff --git a/recoverer.go b/recoverer.go index e350aa7..64abd3a 100644 --- a/recoverer.go +++ b/recoverer.go @@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() { } func (r *recoverer) retry(msg *base.TaskMessage, err error) { - delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) + delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload, nil)) retryAt := time.Now().Add(delay) if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) diff --git a/scheduler_test.go b/scheduler_test.go index fea048e..470dfea 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -26,7 +26,7 @@ func TestSchedulerRegister(t *testing.T) { }{ { cronspec: "@every 3s", - task: NewTask("task1", nil), + task: NewTask("task1", nil, nil), opts: []Option{MaxRetry(10)}, wait: 10 * time.Second, queue: "default", @@ -94,7 +94,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) { &SchedulerOpts{EnqueueErrorHandler: errorHandler}, ) - task := NewTask("test", nil) + task := NewTask("test", nil, nil) if _, err := scheduler.Register("@every 3s", task); err != nil { t.Fatal(err) @@ -124,7 +124,7 @@ func TestSchedulerUnregister(t *testing.T) { }{ { cronspec: "@every 3s", - task: NewTask("task1", nil), + task: NewTask("task1", nil, nil), opts: []Option{MaxRetry(10)}, wait: 10 * time.Second, queue: "default", @@ -183,7 +183,7 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) { }, ) - task := NewTask("test", nil) + task := NewTask("test", nil, nil) if _, err := scheduler.Register("@every 3s", task); err != nil { t.Fatal(err) diff --git a/servemux_test.go b/servemux_test.go index 227c4d7..5ee9ab2 100644 --- a/servemux_test.go +++ b/servemux_test.go @@ -24,7 +24,7 @@ func makeFakeHandler(identity string) Handler { } // makeFakeMiddleware returns a middleware function that appends the given identity -//to the global invoked slice. +// to the global invoked slice. func makeFakeMiddleware(identity string) MiddlewareFunc { return func(next Handler) Handler { return HandlerFunc(func(ctx context.Context, t *Task) error { @@ -62,7 +62,7 @@ func TestServeMux(t *testing.T) { for _, tc := range serveMuxTests { called = "" // reset to zero value - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, nil, nil) if err := mux.ProcessTask(context.Background(), task); err != nil { t.Fatal(err) } @@ -121,7 +121,7 @@ func TestServeMuxNotFound(t *testing.T) { } for _, tc := range notFoundTests { - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, nil, nil) err := mux.ProcessTask(context.Background(), task) if err == nil { t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) @@ -154,7 +154,7 @@ func TestServeMuxMiddlewares(t *testing.T) { invoked = []string{} // reset to empty slice called = "" // reset to zero value - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, nil, nil) if err := mux.ProcessTask(context.Background(), task); err != nil { t.Fatal(err) } diff --git a/server_test.go b/server_test.go index 724b48a..4371d3c 100644 --- a/server_test.go +++ b/server_test.go @@ -40,12 +40,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123}))) + _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123}), nil)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) + _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 456}), nil), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) { } for i := 0; i < 10; i++ { - _, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) + _, err := c.Enqueue(NewTask("enqueued", nil, nil), MaxRetry(i)) if err != nil { t.Fatal(err) } - _, err = c.Enqueue(NewTask("bad_task", nil)) + _, err = c.Enqueue(NewTask("bad_task", nil, nil)) if err != nil { t.Fatal(err) } - _, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second)) + _, err = c.Enqueue(NewTask("scheduled", nil, nil), ProcessIn(time.Duration(i)*time.Second)) if err != nil { t.Fatal(err) }