2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

feat: add metadata task

This commit is contained in:
Amirala Barjasteh 2024-03-10 17:03:28 +03:30
parent 38f7499b71
commit 3b46717850
22 changed files with 299 additions and 237 deletions

View File

@ -156,7 +156,7 @@ func (a *aggregator) aggregate(t time.Time) {
} }
tasks := make([]*Task, len(msgs)) tasks := make([]*Task, len(msgs))
for i, m := range msgs { for i, m := range msgs {
tasks[i] = NewTask(m.Type, m.Payload) tasks[i] = NewTask(m.Type, m.Payload, nil)
} }
aggregatedTask := a.ga.Aggregate(gname, tasks) aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline) ctx, cancel := context.WithDeadline(context.Background(), deadline)

View File

@ -39,12 +39,12 @@ func TestAggregator(t *testing.T) {
maxDelay: 0, // no maxdelay limit maxDelay: 0, // no maxdelay limit
maxSize: 0, // no maxsize limit maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task { aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
}, },
tasks: []*Task{ tasks: []*Task{
NewTask("task1", nil, Group("mygroup")), NewTask("task1", nil, nil, Group("mygroup")),
NewTask("task2", nil, Group("mygroup")), NewTask("task2", nil, nil, Group("mygroup")),
NewTask("task3", nil, Group("mygroup")), NewTask("task3", nil, nil, Group("mygroup")),
}, },
enqueueFrequency: 300 * time.Millisecond, enqueueFrequency: 300 * time.Millisecond,
waitTime: 3 * time.Second, waitTime: 3 * time.Second,
@ -65,13 +65,13 @@ func TestAggregator(t *testing.T) {
maxDelay: 4 * time.Second, maxDelay: 4 * time.Second,
maxSize: 0, // no maxsize limit maxSize: 0, // no maxsize limit
aggregateFunc: func(gname string, tasks []*Task) *Task { aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
}, },
tasks: []*Task{ tasks: []*Task{
NewTask("task1", nil, Group("mygroup")), // time 0 NewTask("task1", nil, nil, Group("mygroup")), // time 0
NewTask("task2", nil, Group("mygroup")), // time 1s NewTask("task2", nil, nil, Group("mygroup")), // time 1s
NewTask("task3", nil, Group("mygroup")), // time 2s NewTask("task3", nil, nil, Group("mygroup")), // time 2s
NewTask("task4", nil, Group("mygroup")), // time 3s NewTask("task4", nil, nil, Group("mygroup")), // time 3s
}, },
enqueueFrequency: 1 * time.Second, enqueueFrequency: 1 * time.Second,
waitTime: 4 * time.Second, waitTime: 4 * time.Second,
@ -92,14 +92,14 @@ func TestAggregator(t *testing.T) {
maxDelay: 0, // no maxdelay limit maxDelay: 0, // no maxdelay limit
maxSize: 5, maxSize: 5,
aggregateFunc: func(gname string, tasks []*Task) *Task { aggregateFunc: func(gname string, tasks []*Task) *Task {
return NewTask(gname, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated return NewTask(gname, nil, nil, MaxRetry(len(tasks))) // use max retry to see how many tasks were aggregated
}, },
tasks: []*Task{ tasks: []*Task{
NewTask("task1", nil, Group("mygroup")), NewTask("task1", nil, nil, Group("mygroup")),
NewTask("task2", nil, Group("mygroup")), NewTask("task2", nil, nil, Group("mygroup")),
NewTask("task3", nil, Group("mygroup")), NewTask("task3", nil, nil, Group("mygroup")),
NewTask("task4", nil, Group("mygroup")), NewTask("task4", nil, nil, Group("mygroup")),
NewTask("task5", nil, Group("mygroup")), NewTask("task5", nil, nil, Group("mygroup")),
}, },
enqueueFrequency: 300 * time.Millisecond, enqueueFrequency: 300 * time.Millisecond,
waitTime: defaultAggregationCheckInterval * 2, waitTime: defaultAggregationCheckInterval * 2,

View File

@ -14,8 +14,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/redis/go-redis/v9"
) )
// Task represents a unit of work to be performed. // Task represents a unit of work to be performed.
@ -31,10 +31,14 @@ type Task struct {
// w is the ResultWriter for the task. // w is the ResultWriter for the task.
w *ResultWriter w *ResultWriter
// md holds metadata of the task.
md Metadata
} }
func (t *Task) Type() string { return t.typename } func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload } func (t *Task) Payload() []byte { return t.payload }
func (t *Task) Metadata() map[string]string { return t.md }
// ResultWriter returns a pointer to the ResultWriter associated with the task. // ResultWriter returns a pointer to the ResultWriter associated with the task.
// //
@ -44,19 +48,21 @@ func (t *Task) ResultWriter() *ResultWriter { return t.w }
// NewTask returns a new Task given a type name and payload data. // NewTask returns a new Task given a type name and payload data.
// Options can be passed to configure task processing behavior. // Options can be passed to configure task processing behavior.
func NewTask(typename string, payload []byte, opts ...Option) *Task { func NewTask(typename string, payload []byte, md Metadata, opts ...Option) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
payload: payload, payload: payload,
md: md,
opts: opts, opts: opts,
} }
} }
// newTask creates a task with the given typename, payload and ResultWriter. // newTask creates a task with the given typename, payload and ResultWriter.
func newTask(typename string, payload []byte, w *ResultWriter) *Task { func newTask(typename string, payload []byte, md Metadata, w *ResultWriter) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
payload: payload, payload: payload,
md: md,
w: w, w: w,
} }
} }
@ -438,6 +444,7 @@ func (opt RedisClusterClientOpt) MakeRedisClient() interface{} {
// //
// Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:. // Three URI schemes are supported, which are redis:, rediss:, redis-socket:, and redis-sentinel:.
// Supported formats are: // Supported formats are:
//
// redis://[:password@]host[:port][/dbnumber] // redis://[:password@]host[:port][/dbnumber]
// rediss://[:password@]host[:port][/dbnumber] // rediss://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber] // redis-socket://[:password@]path[?db=dbnumber]

View File

@ -21,7 +21,7 @@ func makeTask(n int) *Task {
if err != nil { if err != nil {
panic(err) panic(err)
} }
return NewTask(fmt.Sprintf("task%d", n), b) return NewTask(fmt.Sprintf("task%d", n), b, nil)
} }
// Simple E2E Benchmark testing with no scheduled tasks and retries. // Simple E2E Benchmark testing with no scheduled tasks and retries.
@ -222,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing") b.Log("Starting enqueueing")
enqueued := 0 enqueued := 0
for enqueued < 100000 { for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}), nil)
if _, err := client.Enqueue(t); err != nil { if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err) b.Logf("could not enqueue task %d: %v", enqueued, err)
continue continue

View File

@ -10,11 +10,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
) )
// A Client is responsible for scheduling tasks. // A Client is responsible for scheduling tasks.
@ -369,6 +369,7 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
} }
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: opt.taskID, ID: opt.taskID,
Metadata: task.Metadata(),
Type: task.Type(), Type: task.Type(),
Payload: task.Payload(), Payload: task.Payload(),
Queue: opt.queue, Queue: opt.queue,

View File

@ -21,7 +21,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
var ( var (
now = time.Now() now = time.Now()
@ -148,7 +148,7 @@ func TestClientEnqueue(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -483,7 +483,7 @@ func TestClientEnqueueWithGroupOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("mytask", []byte("foo")) task := NewTask("mytask", []byte("foo"), nil)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -635,7 +635,7 @@ func TestClientEnqueueWithTaskIDOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", nil) task := NewTask("send_email", nil, nil)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -713,7 +713,7 @@ func TestClientEnqueueWithConflictingTaskID(t *testing.T) {
defer client.Close() defer client.Close()
const taskID = "custom_id" const taskID = "custom_id"
task := NewTask("foo", nil) task := NewTask("foo", nil, nil)
if _, err := client.Enqueue(task, TaskID(taskID)); err != nil { if _, err := client.Enqueue(task, TaskID(taskID)); err != nil {
t.Fatalf("First task: Enqueue failed: %v", err) t.Fatalf("First task: Enqueue failed: %v", err)
@ -729,7 +729,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -852,7 +852,7 @@ func TestClientEnqueueError(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}), nil)
tests := []struct { tests := []struct {
desc string desc string
@ -873,27 +873,27 @@ func TestClientEnqueueError(t *testing.T) {
}, },
{ {
desc: "With empty task typename", desc: "With empty task typename",
task: NewTask("", h.JSON(map[string]interface{}{})), task: NewTask("", h.JSON(map[string]interface{}{}), nil),
opts: []Option{}, opts: []Option{},
}, },
{ {
desc: "With blank task typename", desc: "With blank task typename",
task: NewTask(" ", h.JSON(map[string]interface{}{})), task: NewTask(" ", h.JSON(map[string]interface{}{}), nil),
opts: []Option{}, opts: []Option{},
}, },
{ {
desc: "With empty task ID", desc: "With empty task ID",
task: NewTask("foo", nil), task: NewTask("foo", nil, nil),
opts: []Option{TaskID("")}, opts: []Option{TaskID("")},
}, },
{ {
desc: "With blank task ID", desc: "With blank task ID",
task: NewTask("foo", nil), task: NewTask("foo", nil, nil),
opts: []Option{TaskID(" ")}, opts: []Option{TaskID(" ")},
}, },
{ {
desc: "With unique option less than 1s", desc: "With unique option less than 1s",
task: NewTask("foo", nil), task: NewTask("foo", nil, nil),
opts: []Option{Unique(300 * time.Millisecond)}, opts: []Option{Unique(300 * time.Millisecond)},
}, },
} }
@ -1015,7 +1015,7 @@ func TestClientWithDefaultOptions(t *testing.T) {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close() defer c.Close()
task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...) task := NewTask(tc.tasktype, tc.payload, nil, tc.defaultOpts...)
gotInfo, err := c.Enqueue(task, tc.opts...) gotInfo, err := c.Enqueue(task, tc.opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1052,7 +1052,7 @@ func TestClientEnqueueUnique(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), NewTask("email", h.JSON(map[string]interface{}{"user_id": 123}), nil),
time.Hour, time.Hour,
}, },
} }
@ -1096,7 +1096,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("reindex", nil), NewTask("reindex", nil, nil),
time.Hour, time.Hour,
10 * time.Minute, 10 * time.Minute,
}, },
@ -1142,7 +1142,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("reindex", nil), NewTask("reindex", nil, nil),
time.Now().Add(time.Hour), time.Now().Add(time.Hour),
10 * time.Minute, 10 * time.Minute,
}, },

View File

@ -86,10 +86,10 @@ func ExampleScheduler() {
&asynq.SchedulerOpts{Location: time.Local}, &asynq.SchedulerOpts{Location: time.Local},
) )
if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil, nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil, nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }

1
go.mod
View File

@ -18,4 +18,5 @@ require (
require ( require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/stretchr/testify v1.8.4 // indirect
) )

3
go.sum
View File

@ -24,7 +24,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA= github.com/spf13/cast v1.5.1 h1:R+kOtfhWQE6TVQzY+4D7wJLBgkdVasCEFxSUBYBYIlA=
github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48= github.com/spf13/cast v1.5.1/go.mod h1:b9PdjNptOpzXr7Rq1q9gJML/2cdGQAo69NKzQ10KN48=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=

View File

@ -10,10 +10,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/redis/go-redis/v9"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/redis/go-redis/v9"
) )
// Inspector is a client interface to inspect and mutate the state of // Inspector is a client interface to inspect and mutate the state of
@ -890,7 +890,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err return nil, err
} }
for _, e := range res { for _, e := range res {
task := NewTask(e.Type, e.Payload) task := NewTask(e.Type, e.Payload, nil)
var opts []Option var opts []Option
for _, s := range e.Opts { for _, s := range e.Opts {
if o, err := parseOption(s); err == nil { if o, err := parseOption(s); err == nil {

View File

@ -3324,14 +3324,14 @@ func TestInspectorSchedulerEntries(t *testing.T) {
want: []*SchedulerEntry{ want: []*SchedulerEntry{
{ {
Spec: "* * * * *", Spec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: nil, Opts: nil,
Next: now.Add(5 * time.Hour), Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour), Prev: now.Add(-2 * time.Hour),
}, },
{ {
Spec: "@every 20m", Spec: "@every 20m",
Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"}), nil),
Opts: []Option{Queue("bar"), MaxRetry(20)}, Opts: []Option{Queue("bar"), MaxRetry(20)},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),

View File

@ -234,6 +234,9 @@ func AllAggregationSets(qname string) string {
// TaskMessage is the internal representation of a task with additional metadata fields. // TaskMessage is the internal representation of a task with additional metadata fields.
// Serialized data of this type gets written to redis. // Serialized data of this type gets written to redis.
type TaskMessage struct { type TaskMessage struct {
// Metadata holds metadata of the task.
Metadata map[string]string
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.
Type string Type string
@ -302,6 +305,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
return nil, fmt.Errorf("cannot encode nil message") return nil, fmt.Errorf("cannot encode nil message")
} }
return proto.Marshal(&pb.TaskMessage{ return proto.Marshal(&pb.TaskMessage{
Metadata: msg.Metadata,
Type: msg.Type, Type: msg.Type,
Payload: msg.Payload, Payload: msg.Payload,
Id: msg.ID, Id: msg.ID,
@ -326,6 +330,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
return nil, err return nil, err
} }
return &TaskMessage{ return &TaskMessage{
Metadata: pbmsg.GetMetadata(),
Type: pbmsg.GetType(), Type: pbmsg.GetType(),
Payload: pbmsg.GetPayload(), Payload: pbmsg.GetPayload(),
ID: pbmsg.GetId(), ID: pbmsg.GetId(),

View File

@ -4,14 +4,13 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.25.0 // protoc-gen-go v1.33.0
// protoc v3.17.3 // protoc v4.25.3
// source: asynq.proto // source: asynq.proto
package proto package proto
import ( import (
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
timestamppb "google.golang.org/protobuf/types/known/timestamppb" timestamppb "google.golang.org/protobuf/types/known/timestamppb"
@ -26,10 +25,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
) )
// This is a compile-time assertion that a sufficiently up-to-date version
// of the legacy proto package is being used.
const _ = proto.ProtoPackageIsVersion4
// TaskMessage is the internal representation of a task with additional // TaskMessage is the internal representation of a task with additional
// metadata fields. // metadata fields.
// Next ID: 15 // Next ID: 15
@ -38,6 +33,9 @@ type TaskMessage struct {
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
// Metadata holds additional data for a task.
// This field can be used to carry cross-cutting concerns, such as a Trace ID.
Metadata map[string]string `protobuf:"bytes,15,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.
Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"`
// Payload holds data needed to process the task. // Payload holds data needed to process the task.
@ -111,6 +109,13 @@ func (*TaskMessage) Descriptor() ([]byte, []int) {
return file_asynq_proto_rawDescGZIP(), []int{0} return file_asynq_proto_rawDescGZIP(), []int{0}
} }
func (x *TaskMessage) GetMetadata() map[string]string {
if x != nil {
return x.Metadata
}
return nil
}
func (x *TaskMessage) GetType() string { func (x *TaskMessage) GetType() string {
if x != nil { if x != nil {
return x.Type return x.Type
@ -625,105 +630,113 @@ var file_asynq_proto_rawDesc = []byte{
0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61, 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x61,
0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x73, 0x79, 0x6e, 0x71, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x87, 0x03, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x82, 0x04, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3c, 0x0a, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74,
0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x61, 0x18, 0x0f, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e,
0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x54, 0x61, 0x73, 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x4d, 0x65, 0x74, 0x61,
0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64,
0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x61, 0x74, 0x61, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f,
0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69,
0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79,
0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, 0x72, 0x79, 0x12, 0x18, 0x0a,
0x61, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07,
0x6c, 0x61, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x72, 0x72, 0x6f, 0x72,
0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x72, 0x72, 0x6f,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x66, 0x61, 0x69,
0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6c, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6c, 0x61,
0x6e, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x73, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x65, 0x64, 0x41, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d,
0x79, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65,
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c, 0x18, 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65,
0x0a, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x12, 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a,
0x03, 0x52, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x12,
0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0e, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x22, 0x28, 0x09, 0x52, 0x08, 0x67, 0x72, 0x6f, 0x75, 0x70, 0x4b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09,
0x8f, 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52,
0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x09, 0x72, 0x65, 0x74, 0x65, 0x6e, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6f,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03,
0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x41, 0x74, 0x1a, 0x3b, 0x0a,
0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10,
0x64, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x6e, 0x63, 0x79, 0x12, 0x35, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8f, 0x03, 0x0a, 0x0a, 0x53,
0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73,
0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a,
0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12,
0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01,
0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b,
0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x35,
0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x2e, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x71,
0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x5f,
0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e,
0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x16,
0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06,
0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f,
0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d,
0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b,
0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11,
0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e,
0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79,
0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xb1, 0x02, 0x0a,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69,
0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03,
0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x17,
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f,
0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b,
0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65,
0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x39, 0x0a,
0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73,
0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64,
0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f,
0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d,
0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65,
0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e,
0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28,
0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f,
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b,
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79,
0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b,
0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09,
0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65,
0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69,
0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x45, 0x6e, 0x71,
0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x46, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76,
0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52,
0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65,
0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x71,
0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73,
0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b,
0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69,
0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73,
0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d,
0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x17, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e,
0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72,
0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x6f, 0x74, 0x6f, 0x33,
0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, 0x75, 0x65,
0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62,
0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, 0x73, 0x79,
0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -738,29 +751,31 @@ func file_asynq_proto_rawDescGZIP() []byte {
return file_asynq_proto_rawDescData return file_asynq_proto_rawDescData
} }
var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_asynq_proto_goTypes = []interface{}{ var file_asynq_proto_goTypes = []interface{}{
(*TaskMessage)(nil), // 0: asynq.TaskMessage (*TaskMessage)(nil), // 0: asynq.TaskMessage
(*ServerInfo)(nil), // 1: asynq.ServerInfo (*ServerInfo)(nil), // 1: asynq.ServerInfo
(*WorkerInfo)(nil), // 2: asynq.WorkerInfo (*WorkerInfo)(nil), // 2: asynq.WorkerInfo
(*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry (*SchedulerEntry)(nil), // 3: asynq.SchedulerEntry
(*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent (*SchedulerEnqueueEvent)(nil), // 4: asynq.SchedulerEnqueueEvent
nil, // 5: asynq.ServerInfo.QueuesEntry nil, // 5: asynq.TaskMessage.MetadataEntry
(*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp nil, // 6: asynq.ServerInfo.QueuesEntry
(*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
} }
var file_asynq_proto_depIdxs = []int32{ var file_asynq_proto_depIdxs = []int32{
5, // 0: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry 5, // 0: asynq.TaskMessage.metadata:type_name -> asynq.TaskMessage.MetadataEntry
6, // 1: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp 6, // 1: asynq.ServerInfo.queues:type_name -> asynq.ServerInfo.QueuesEntry
6, // 2: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp 7, // 2: asynq.ServerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 3: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp 7, // 3: asynq.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp
6, // 4: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp 7, // 4: asynq.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp
6, // 5: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp 7, // 5: asynq.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp
6, // 6: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp 7, // 6: asynq.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method output_type 7, // 7: asynq.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp
7, // [7:7] is the sub-list for method input_type 8, // [8:8] is the sub-list for method output_type
7, // [7:7] is the sub-list for extension type_name 8, // [8:8] is the sub-list for method input_type
7, // [7:7] is the sub-list for extension extendee 8, // [8:8] is the sub-list for extension type_name
0, // [0:7] is the sub-list for field type_name 8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
} }
func init() { file_asynq_proto_init() } func init() { file_asynq_proto_init() }
@ -836,7 +851,7 @@ func file_asynq_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_asynq_proto_rawDesc, RawDescriptor: file_asynq_proto_rawDesc,
NumEnums: 0, NumEnums: 0,
NumMessages: 6, NumMessages: 7,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

View File

@ -13,6 +13,10 @@ option go_package = "github.com/hibiken/asynq/internal/proto";
// metadata fields. // metadata fields.
// Next ID: 15 // Next ID: 15
message TaskMessage { message TaskMessage {
// Metadata holds additional data for a task.
// This field can be used to carry cross-cutting concerns, such as a Trace ID.
map<string, string> metadata = 15;
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.
string type = 1; string type = 1;

27
metadata.go Normal file
View File

@ -0,0 +1,27 @@
package asynq
type Metadata map[string]string
func (a Metadata) Get(key string) string {
v, ok := a[key]
if !ok {
return ""
}
return v
}
func (a Metadata) Set(key string, value string) {
a[key] = value
}
func (a Metadata) Keys() []string {
i := 0
r := make([]string, len(a))
for k := range a {
r[i] = k
i++
}
return r
}

View File

@ -33,8 +33,8 @@ func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) {
func TestNewPeriodicTaskManager(t *testing.T) { func TestNewPeriodicTaskManager(t *testing.T) {
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)}, {Cronspec: "* * * * *", Task: NewTask("foo", nil, nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)}, {Cronspec: "* * * * *", Task: NewTask("bar", nil, nil)},
} }
tests := []struct { tests := []struct {
desc string desc string
@ -78,8 +78,8 @@ func TestNewPeriodicTaskManager(t *testing.T) {
func TestNewPeriodicTaskManagerError(t *testing.T) { func TestNewPeriodicTaskManagerError(t *testing.T) {
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)}, {Cronspec: "* * * * *", Task: NewTask("foo", nil, nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)}, {Cronspec: "* * * * *", Task: NewTask("bar", nil, nil)},
} }
tests := []struct { tests := []struct {
desc string desc string
@ -118,11 +118,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "basic identity test", desc: "basic identity test",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
}, },
isSame: true, isSame: true,
}, },
@ -130,12 +130,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with a option", desc: "with a option",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
isSame: true, isSame: true,
@ -144,12 +144,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with multiple options (different order)", desc: "with multiple options (different order)",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")}, Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)},
}, },
isSame: true, isSame: true,
@ -158,12 +158,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with payload", desc: "with payload",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello world!")), Task: NewTask("foo", []byte("hello world!"), nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello world!")), Task: NewTask("foo", []byte("hello world!"), nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
isSame: true, isSame: true,
@ -172,11 +172,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different cronspecs", desc: "with different cronspecs",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "5 * * * *", Cronspec: "5 * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
}, },
isSame: false, isSame: false,
}, },
@ -184,11 +184,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different task type", desc: "with different task type",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("bar", nil), Task: NewTask("bar", nil, nil),
}, },
isSame: false, isSame: false,
}, },
@ -196,12 +196,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different options", desc: "with different options",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Unique(10 * time.Minute)}, Opts: []Option{Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@ -210,12 +210,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different options (one is subset of the other)", desc: "with different options (one is subset of the other)",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", nil, nil),
Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@ -224,12 +224,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different payload", desc: "with different payload",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello!")), Task: NewTask("foo", []byte("hello!"), nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("HELLO!")), Task: NewTask("foo", []byte("HELLO!"), nil),
Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@ -255,8 +255,8 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
func TestPeriodicTaskManager(t *testing.T) { func TestPeriodicTaskManager(t *testing.T) {
// Note: In this test, we'll use task type as an ID for each config. // Note: In this test, we'll use task type as an ID for each config.
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, {Task: NewTask("task1", nil, nil), Cronspec: "* * * * 1"},
{Task: NewTask("task2", nil), Cronspec: "* * * * 2"}, {Task: NewTask("task2", nil, nil), Cronspec: "* * * * 2"},
} }
const syncInterval = 3 * time.Second const syncInterval = 3 * time.Second
provider := &FakeConfigProvider{cfgs: cfgs} provider := &FakeConfigProvider{cfgs: cfgs}
@ -287,8 +287,8 @@ func TestPeriodicTaskManager(t *testing.T) {
// - task2 removed // - task2 removed
// - task3 added // - task3 added
provider.SetConfigs([]*PeriodicTaskConfig{ provider.SetConfigs([]*PeriodicTaskConfig{
{Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, {Task: NewTask("task1", nil, nil), Cronspec: "* * * * 1"},
{Task: NewTask("task3", nil), Cronspec: "* * * * 3"}, {Task: NewTask("task3", nil, nil), Cronspec: "* * * * 3"},
}) })
// Wait for the next sync // Wait for the next sync

View File

@ -222,6 +222,7 @@ func (p *processor) exec() {
task := newTask( task := newTask(
msg.Type, msg.Type,
msg.Payload, msg.Payload,
msg.Metadata,
&ResultWriter{ &ResultWriter{
id: msg.ID, id: msg.ID,
qname: msg.Queue, qname: msg.Queue,
@ -325,7 +326,7 @@ var SkipRetry = errors.New("skip retry for the task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload, nil), err)
} }
if !p.isFailureFunc(err) { if !p.isFailureFunc(err) {
// retry the task without marking it as failed // retry the task without marking it as failed
@ -346,7 +347,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
return return
} }
ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload, nil))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil { if err != nil {

View File

@ -93,10 +93,10 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
m3 := h.NewTaskMessage("task3", nil) m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessage("task4", nil) m4 := h.NewTaskMessage("task4", nil)
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, m1.Payload, nil)
t2 := NewTask(m2.Type, m2.Payload) t2 := NewTask(m2.Type, m2.Payload, nil)
t3 := NewTask(m3.Type, m3.Payload) t3 := NewTask(m3.Type, m3.Payload, nil)
t4 := NewTask(m4.Type, m4.Payload) t4 := NewTask(m4.Type, m4.Payload, nil)
tests := []struct { tests := []struct {
pending []*base.TaskMessage // initial default queue state pending []*base.TaskMessage // initial default queue state
@ -162,10 +162,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
m3 = h.NewTaskMessageWithQueue("task3", nil, "high") m3 = h.NewTaskMessageWithQueue("task3", nil, "high")
m4 = h.NewTaskMessageWithQueue("task4", nil, "low") m4 = h.NewTaskMessageWithQueue("task4", nil, "low")
t1 = NewTask(m1.Type, m1.Payload) t1 = NewTask(m1.Type, m1.Payload, nil)
t2 = NewTask(m2.Type, m2.Payload) t2 = NewTask(m2.Type, m2.Payload, nil)
t3 = NewTask(m3.Type, m3.Payload) t3 = NewTask(m3.Type, m3.Payload, nil)
t4 = NewTask(m4.Type, m4.Payload) t4 = NewTask(m4.Type, m4.Payload, nil)
) )
defer r.Close() defer r.Close()
@ -232,7 +232,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111}))
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, m1.Payload, nil)
tests := []struct { tests := []struct {
pending []*base.TaskMessage // initial default queue state pending []*base.TaskMessage // initial default queue state
@ -639,13 +639,13 @@ func TestProcessorWithStrictPriority(t *testing.T) {
m6 = h.NewTaskMessageWithQueue("task6", nil, "low") m6 = h.NewTaskMessageWithQueue("task6", nil, "low")
m7 = h.NewTaskMessageWithQueue("task7", nil, "low") m7 = h.NewTaskMessageWithQueue("task7", nil, "low")
t1 = NewTask(m1.Type, m1.Payload) t1 = NewTask(m1.Type, m1.Payload, nil)
t2 = NewTask(m2.Type, m2.Payload) t2 = NewTask(m2.Type, m2.Payload, nil)
t3 = NewTask(m3.Type, m3.Payload) t3 = NewTask(m3.Type, m3.Payload, nil)
t4 = NewTask(m4.Type, m4.Payload) t4 = NewTask(m4.Type, m4.Payload, nil)
t5 = NewTask(m5.Type, m5.Payload) t5 = NewTask(m5.Type, m5.Payload, nil)
t6 = NewTask(m6.Type, m6.Payload) t6 = NewTask(m6.Type, m6.Payload, nil)
t7 = NewTask(m7.Type, m7.Payload) t7 = NewTask(m7.Type, m7.Payload, nil)
) )
defer r.Close() defer r.Close()
@ -742,7 +742,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return nil return nil
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil),
wantErr: false, wantErr: false,
}, },
{ {
@ -750,7 +750,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return fmt.Errorf("something went wrong") return fmt.Errorf("something went wrong")
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil),
wantErr: true, wantErr: true,
}, },
{ {
@ -758,7 +758,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
panic("something went terribly wrong") panic("something went terribly wrong")
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil),
wantErr: true, wantErr: true,
}, },
} }
@ -928,7 +928,7 @@ func TestProcessorComputeDeadline(t *testing.T) {
func TestReturnPanicError(t *testing.T) { func TestReturnPanicError(t *testing.T) {
task := NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})) task := NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"}), nil)
tests := []struct { tests := []struct {
name string name string

View File

@ -112,7 +112,7 @@ func (r *recoverer) recoverStaleAggregationSets() {
} }
func (r *recoverer) retry(msg *base.TaskMessage, err error) { func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload, nil))
retryAt := time.Now().Add(delay) retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@ -26,7 +26,7 @@ func TestSchedulerRegister(t *testing.T) {
}{ }{
{ {
cronspec: "@every 3s", cronspec: "@every 3s",
task: NewTask("task1", nil), task: NewTask("task1", nil, nil),
opts: []Option{MaxRetry(10)}, opts: []Option{MaxRetry(10)},
wait: 10 * time.Second, wait: 10 * time.Second,
queue: "default", queue: "default",
@ -94,7 +94,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
&SchedulerOpts{EnqueueErrorHandler: errorHandler}, &SchedulerOpts{EnqueueErrorHandler: errorHandler},
) )
task := NewTask("test", nil) task := NewTask("test", nil, nil)
if _, err := scheduler.Register("@every 3s", task); err != nil { if _, err := scheduler.Register("@every 3s", task); err != nil {
t.Fatal(err) t.Fatal(err)
@ -124,7 +124,7 @@ func TestSchedulerUnregister(t *testing.T) {
}{ }{
{ {
cronspec: "@every 3s", cronspec: "@every 3s",
task: NewTask("task1", nil), task: NewTask("task1", nil, nil),
opts: []Option{MaxRetry(10)}, opts: []Option{MaxRetry(10)},
wait: 10 * time.Second, wait: 10 * time.Second,
queue: "default", queue: "default",
@ -183,7 +183,7 @@ func TestSchedulerPostAndPreEnqueueHandler(t *testing.T) {
}, },
) )
task := NewTask("test", nil) task := NewTask("test", nil, nil)
if _, err := scheduler.Register("@every 3s", task); err != nil { if _, err := scheduler.Register("@every 3s", task); err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -62,7 +62,7 @@ func TestServeMux(t *testing.T) {
for _, tc := range serveMuxTests { for _, tc := range serveMuxTests {
called = "" // reset to zero value called = "" // reset to zero value
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, nil, nil)
if err := mux.ProcessTask(context.Background(), task); err != nil { if err := mux.ProcessTask(context.Background(), task); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -121,7 +121,7 @@ func TestServeMuxNotFound(t *testing.T) {
} }
for _, tc := range notFoundTests { for _, tc := range notFoundTests {
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, nil, nil)
err := mux.ProcessTask(context.Background(), task) err := mux.ProcessTask(context.Background(), task)
if err == nil { if err == nil {
t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type())
@ -154,7 +154,7 @@ func TestServeMuxMiddlewares(t *testing.T) {
invoked = []string{} // reset to empty slice invoked = []string{} // reset to empty slice
called = "" // reset to zero value called = "" // reset to zero value
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, nil, nil)
if err := mux.ProcessTask(context.Background(), task); err != nil { if err := mux.ProcessTask(context.Background(), task); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -40,12 +40,12 @@ func TestServer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123}))) _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123}), nil))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
_, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) _, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 456}), nil), ProcessIn(1*time.Hour))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) {
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) _, err := c.Enqueue(NewTask("enqueued", nil, nil), MaxRetry(i))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("bad_task", nil)) _, err = c.Enqueue(NewTask("bad_task", nil, nil))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second)) _, err = c.Enqueue(NewTask("scheduled", nil, nil), ProcessIn(time.Duration(i)*time.Second))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }