diff --git a/CHANGELOG.md b/CHANGELOG.md index 02512ee..ecf4474 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- NewTask function now takes array of bytes as payload. - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script) diff --git a/asynq.go b/asynq.go index 210ef69..06ac285 100644 --- a/asynq.go +++ b/asynq.go @@ -16,20 +16,21 @@ import ( // Task represents a unit of work to be performed. type Task struct { - // Type indicates the type of task to be performed. - Type string + // typename indicates the type of task to be performed. + typename string - // Payload holds data needed to perform the task. - Payload Payload + // payload holds data needed to perform the task. + payload []byte } +func (t *Task) Type() string { return t.typename } +func (t *Task) Payload() []byte { return t.payload } + // NewTask returns a new Task given a type name and payload data. -// -// The payload values must be serializable. -func NewTask(typename string, payload map[string]interface{}) *Task { +func NewTask(typename string, payload []byte) *Task { return &Task{ - Type: typename, - Payload: Payload{payload}, + typename: typename, + payload: payload, } } diff --git a/client.go b/client.go index 9402099..dc1841c 100644 --- a/client.go +++ b/client.go @@ -176,7 +176,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v) func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Value() interface{} { return time.Duration(d) } - // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. @@ -305,7 +304,7 @@ func (c *Client) Close() error { // If no ProcessAt or ProcessIn options are passed, the task will be processed immediately. func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { c.mu.Lock() - if defaults, ok := c.opts[task.Type]; ok { + if defaults, ok := c.opts[task.Type()]; ok { opts = append(defaults, opts...) } c.mu.Unlock() @@ -327,12 +326,12 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { } var uniqueKey string if opt.uniqueTTL > 0 { - uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data) + uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) } msg := &base.TaskMessage{ ID: uuid.New(), - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Queue: opt.queue, Retry: opt.retry, Deadline: deadline.Unix(), diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 7b1743d..2a54f60 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -93,13 +93,13 @@ var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) [] var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. -func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { +func NewTaskMessage(taskType string, payload []byte) *base.TaskMessage { return NewTaskMessageWithQueue(taskType, payload, base.DefaultQueueName) } // NewTaskMessageWithQueue returns a new instance of TaskMessage given a // task type, payload and queue name. -func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage { +func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *base.TaskMessage { return &base.TaskMessage{ ID: uuid.New(), Type: taskType, diff --git a/internal/base/base.go b/internal/base/base.go index dbca940..9ea18e4 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,12 +6,8 @@ package base import ( - "bytes" "context" - "encoding/json" "fmt" - "sort" - "strings" "sync" "time" @@ -125,32 +121,8 @@ func SchedulerHistoryKey(entryID string) string { } // UniqueKey returns a redis key with the given type, payload, and queue name. -func UniqueKey(qname, tasktype string, payload map[string]interface{}) string { - return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload)) -} - -func serializePayload(payload map[string]interface{}) string { - if payload == nil { - return "nil" - } - type entry struct { - k string - v interface{} - } - var es []entry - for k, v := range payload { - es = append(es, entry{k, v}) - } - // sort entries by key - sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k }) - var b strings.Builder - for _, e := range es { - if b.Len() > 0 { - b.WriteString(",") - } - b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v)) - } - return b.String() +func UniqueKey(qname, tasktype string, payload []byte) string { + return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload)) } // TaskMessage is the internal representation of a task with additional metadata fields. @@ -160,7 +132,7 @@ type TaskMessage struct { Type string // Payload holds data needed to process the task. - Payload map[string]interface{} + Payload []byte // ID is a unique identifier for each task. ID uuid.UUID @@ -203,13 +175,9 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { if msg == nil { return nil, fmt.Errorf("cannot encode nil message") } - payload, err := json.Marshal(msg.Payload) - if err != nil { - return nil, err - } return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, - Payload: payload, + Payload: msg.Payload, Id: msg.ID.String(), Queue: msg.Queue, Retry: int32(msg.Retry), @@ -227,13 +195,9 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { if err := proto.Unmarshal(data, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetPayload()) - if err != nil { - return nil, err - } return &TaskMessage{ Type: pbmsg.GetType(), - Payload: payload, + Payload: pbmsg.GetPayload(), ID: uuid.MustParse(pbmsg.GetId()), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), @@ -383,7 +347,7 @@ type WorkerInfo struct { ServerID string ID string Type string - Payload map[string]interface{} + Payload []byte Queue string Started time.Time Deadline time.Time @@ -394,10 +358,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { if info == nil { return nil, fmt.Errorf("cannot encode nil worker info") } - payload, err := json.Marshal(info.Payload) - if err != nil { - return nil, err - } startTime, err := ptypes.TimestampProto(info.Started) if err != nil { return nil, err @@ -412,33 +372,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { ServerId: info.ServerID, TaskId: info.ID, TaskType: info.Type, - TaskPayload: payload, + TaskPayload: info.Payload, Queue: info.Queue, StartTime: startTime, Deadline: deadline, }) } -func decodePayload(b []byte) (map[string]interface{}, error) { - d := json.NewDecoder(bytes.NewReader(b)) - d.UseNumber() - payload := make(map[string]interface{}) - if err := d.Decode(&payload); err != nil { - return nil, err - } - return payload, nil -} - // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { var pbmsg pb.WorkerInfo if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) if err != nil { return nil, err @@ -453,7 +399,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { ServerID: pbmsg.GetServerId(), ID: pbmsg.GetTaskId(), Type: pbmsg.GetTaskType(), - Payload: payload, + Payload: pbmsg.GetTaskPayload(), Queue: pbmsg.GetQueue(), Started: startTime, Deadline: deadline, @@ -472,7 +418,7 @@ type SchedulerEntry struct { Type string // Payload is the payload of the periodic task. - Payload map[string]interface{} + Payload []byte // Opts is the options for the periodic task. Opts []string @@ -490,10 +436,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { if entry == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } - payload, err := json.Marshal(entry.Payload) - if err != nil { - return nil, err - } next, err := ptypes.TimestampProto(entry.Next) if err != nil { return nil, err @@ -506,7 +448,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { Id: entry.ID, Spec: entry.Spec, TaskType: entry.Type, - TaskPayload: payload, + TaskPayload: entry.Payload, EnqueueOptions: entry.Opts, NextEnqueueTime: next, PrevEnqueueTime: prev, @@ -519,10 +461,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) if err != nil { return nil, err @@ -535,7 +473,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { ID: pbmsg.GetId(), Spec: pbmsg.GetSpec(), Type: pbmsg.GetTaskType(), - Payload: payload, + Payload: pbmsg.GetTaskPayload(), Opts: pbmsg.GetEnqueueOptions(), Next: next, Prev: prev, diff --git a/payload.go b/payload.go deleted file mode 100644 index b447ef9..0000000 --- a/payload.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package asynq - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/spf13/cast" -) - -// Payload holds arbitrary data needed for task execution. -type Payload struct { - data map[string]interface{} -} - -type errKeyNotFound struct { - key string -} - -func (e *errKeyNotFound) Error() string { - return fmt.Sprintf("key %q does not exist", e.key) -} - -// Has reports whether key exists. -func (p Payload) Has(key string) bool { - _, ok := p.data[key] - return ok -} - -func toInt(v interface{}) (int, error) { - switch v := v.(type) { - case json.Number: - val, err := v.Int64() - if err != nil { - return 0, err - } - return int(val), nil - default: - return cast.ToIntE(v) - } -} - -// String returns a string representation of payload data. -func (p Payload) String() string { - return fmt.Sprint(p.data) -} - -// MarshalJSON returns the JSON encoding of payload data. -func (p Payload) MarshalJSON() ([]byte, error) { - return json.Marshal(p.data) -} - -// GetString returns a string value if a string type is associated with -// the key, otherwise reports an error. -func (p Payload) GetString(key string) (string, error) { - v, ok := p.data[key] - if !ok { - return "", &errKeyNotFound{key} - } - return cast.ToStringE(v) -} - -// GetInt returns an int value if a numeric type is associated with -// the key, otherwise reports an error. -func (p Payload) GetInt(key string) (int, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - return toInt(v) -} - -// GetFloat64 returns a float64 value if a numeric type is associated with -// the key, otherwise reports an error. -func (p Payload) GetFloat64(key string) (float64, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - switch v := v.(type) { - case json.Number: - return v.Float64() - default: - return cast.ToFloat64E(v) - } -} - -// GetBool returns a boolean value if a boolean type is associated with -// the key, otherwise reports an error. -func (p Payload) GetBool(key string) (bool, error) { - v, ok := p.data[key] - if !ok { - return false, &errKeyNotFound{key} - } - return cast.ToBoolE(v) -} - -// GetStringSlice returns a slice of strings if a string slice type is associated with -// the key, otherwise reports an error. -func (p Payload) GetStringSlice(key string) ([]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringSliceE(v) -} - -// GetIntSlice returns a slice of ints if a int slice type is associated with -// the key, otherwise reports an error. -func (p Payload) GetIntSlice(key string) ([]int, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - switch v := v.(type) { - case []interface{}: - var res []int - for _, elem := range v { - val, err := toInt(elem) - if err != nil { - return nil, err - } - res = append(res, int(val)) - } - return res, nil - default: - return cast.ToIntSliceE(v) - } -} - -// GetStringMap returns a map of string to empty interface -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMap(key string) (map[string]interface{}, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapE(v) -} - -// GetStringMapString returns a map of string to string -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapString(key string) (map[string]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapStringE(v) -} - -// GetStringMapStringSlice returns a map of string to string slice -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapStringSliceE(v) -} - -// GetStringMapInt returns a map of string to int -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapInt(key string) (map[string]int, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - switch v := v.(type) { - case map[string]interface{}: - res := make(map[string]int) - for key, val := range v { - ival, err := toInt(val) - if err != nil { - return nil, err - } - res[key] = ival - } - return res, nil - default: - return cast.ToStringMapIntE(v) - } -} - -// GetStringMapBool returns a map of string to boolean -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapBool(key string) (map[string]bool, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapBoolE(v) -} - -// GetTime returns a time value if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetTime(key string) (time.Time, error) { - v, ok := p.data[key] - if !ok { - return time.Time{}, &errKeyNotFound{key} - } - return cast.ToTimeE(v) -} - -// GetDuration returns a duration value if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetDuration(key string) (time.Duration, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - switch v := v.(type) { - case json.Number: - val, err := v.Int64() - if err != nil { - return 0, err - } - return time.Duration(val), nil - default: - return cast.ToDurationE(v) - } -} diff --git a/payload_test.go b/payload_test.go deleted file mode 100644 index 379a4ac..0000000 --- a/payload_test.go +++ /dev/null @@ -1,675 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package asynq - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - h "github.com/hibiken/asynq/internal/asynqtest" - "github.com/hibiken/asynq/internal/base" -) - -type payloadTest struct { - data map[string]interface{} - key string - nonkey string -} - -func TestPayloadString(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"name": "gopher"}, - key: "name", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetString(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetString(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetString(tc.nonkey) - if err == nil || got != "" { - t.Errorf("Payload.GetString(%q) = %v, %v; want '', error", - tc.key, got, err) - } - } -} - -func TestPayloadInt(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"user_id": 42}, - key: "user_id", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetInt(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetInt(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetInt(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error", - tc.key, got, err) - } - } -} - -func TestPayloadFloat64(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"pi": 3.14}, - key: "pi", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetFloat64(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetFloat64(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetFloat64(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetFloat64(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetFloat64(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetFloat64(%q) = %v, %v; want 0, error", - tc.key, got, err) - } - } -} - -func TestPayloadBool(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"enabled": true}, - key: "enabled", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetBool(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetBool(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetBool(tc.nonkey) - if err == nil || got != false { - t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringSlice(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"names": []string{"luke", "rey", "anakin"}}, - key: "names", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadIntSlice(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"nums": []int{9, 8, 7}}, - key: "nums", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetIntSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetIntSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetIntSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetIntSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMap(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"user": map[string]interface{}{"name": "Jon Doe", "score": 2.2}}, - key: "user", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMap(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMap(tc.key) - ignoreOpt := cmpopts.IgnoreMapEntries(func(key string, val interface{}) bool { - switch val.(type) { - case json.Number: - return true - default: - return false - } - }) - diff = cmp.Diff(got, tc.data[tc.key], ignoreOpt) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMap(%q) = %v, %v, want %v, nil;(-want,+got)\n%s", - tc.key, got, err, tc.data[tc.key], diff) - } - - // access non-existent key. - got, err = payload.GetStringMap(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapString(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"address": map[string]string{"line": "123 Main St", "city": "San Francisco", "state": "CA"}}, - key: "address", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapString(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapString(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapString(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapStringSlice(t *testing.T) { - favs := map[string][]string{ - "movies": {"forrest gump", "star wars"}, - "tv_shows": {"game of thrones", "HIMYM", "breaking bad"}, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"favorites": favs}, - key: "favorites", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapStringSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapStringSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapStringSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapInt(t *testing.T) { - counter := map[string]int{ - "a": 1, - "b": 101, - "c": 42, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"counts": counter}, - key: "counts", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapInt(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapInt(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapInt(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapInt(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapBool(t *testing.T) { - features := map[string]bool{ - "A": false, - "B": true, - "C": true, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"features": features}, - key: "features", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapBool(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapBool(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapBool(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapBool(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadTime(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"current": time.Now()}, - key: "current", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetTime(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetTime(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetTime(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetTime(tc.nonkey) - if err == nil || !got.IsZero() { - t.Errorf("Payload.GetTime(%q) = %v, %v; want %v, error", - tc.key, got, err, time.Time{}) - } - } -} - -func TestPayloadDuration(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"duration": 15 * time.Minute}, - key: "duration", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetDuration(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetDuration(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetDuration(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetDuration(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetDuration(%q) = %v, %v; want %v, error", - tc.key, got, err, time.Duration(0)) - } - } -} - -func TestPayloadHas(t *testing.T) { - payload := Payload{map[string]interface{}{ - "user_id": 123, - }} - - if !payload.Has("user_id") { - t.Errorf("Payload.Has(%q) = false, want true", "user_id") - } - if payload.Has("name") { - t.Errorf("Payload.Has(%q) = true, want false", "name") - } -} - -func TestPayloadDebuggingStrings(t *testing.T) { - data := map[string]interface{}{ - "foo": 123, - "bar": "hello", - "baz": false, - } - payload := Payload{data: data} - - if payload.String() != fmt.Sprint(data) { - t.Errorf("Payload.String() = %q, want %q", - payload.String(), fmt.Sprint(data)) - } - - got, err := payload.MarshalJSON() - if err != nil { - t.Fatal(err) - } - want, err := json.Marshal(data) - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(got, want); diff != "" { - t.Errorf("Payload.MarhsalJSON() = %s, want %s; (-want,+got)\n%s", - got, want, diff) - } -} diff --git a/scheduler.go b/scheduler.go index 8daa507..21280d6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -240,8 +240,8 @@ func (s *Scheduler) beat() { e := &base.SchedulerEntry{ ID: job.id.String(), Spec: job.cronspec, - Type: job.task.Type, - Payload: job.task.Payload.data, + Type: job.task.Type(), + Payload: job.task.Payload(), Opts: stringifyOptions(job.opts), Next: entry.Next, Prev: entry.Prev, diff --git a/servemux.go b/servemux.go index 4ccd6f6..90e9972 100644 --- a/servemux.go +++ b/servemux.go @@ -62,7 +62,7 @@ func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) { mux.mu.RLock() defer mux.mu.RUnlock() - h, pattern = mux.match(t.Type) + h, pattern = mux.match(t.Type()) if h == nil { h, pattern = NotFoundHandler(), "" }