2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 08:40:22 +08:00

Change task payload type to byte slice

This commit is contained in:
Ken Hibino 2021-03-17 06:41:40 -07:00
parent f3c848f25f
commit 848a03dc16
9 changed files with 31 additions and 997 deletions

View File

@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- NewTask function now takes array of bytes as payload.
- Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script)

View File

@ -16,20 +16,21 @@ import (
// Task represents a unit of work to be performed.
type Task struct {
// Type indicates the type of task to be performed.
Type string
// typename indicates the type of task to be performed.
typename string
// Payload holds data needed to perform the task.
Payload Payload
// payload holds data needed to perform the task.
payload []byte
}
func (t *Task) Type() string { return t.typename }
func (t *Task) Payload() []byte { return t.payload }
// NewTask returns a new Task given a type name and payload data.
//
// The payload values must be serializable.
func NewTask(typename string, payload map[string]interface{}) *Task {
func NewTask(typename string, payload []byte) *Task {
return &Task{
Type: typename,
Payload: Payload{payload},
typename: typename,
payload: payload,
}
}

View File

@ -176,7 +176,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)
func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) }
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
@ -305,7 +304,7 @@ func (c *Client) Close() error {
// If no ProcessAt or ProcessIn options are passed, the task will be processed immediately.
func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
c.mu.Lock()
if defaults, ok := c.opts[task.Type]; ok {
if defaults, ok := c.opts[task.Type()]; ok {
opts = append(defaults, opts...)
}
c.mu.Unlock()
@ -327,12 +326,12 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
}
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data)
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
msg := &base.TaskMessage{
ID: uuid.New(),
Type: task.Type,
Payload: task.Payload.data,
Type: task.Type(),
Payload: task.Payload(),
Queue: opt.queue,
Retry: opt.retry,
Deadline: deadline.Unix(),

View File

@ -93,13 +93,13 @@ var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []
var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")
// NewTaskMessage returns a new instance of TaskMessage given a task type and payload.
func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage {
func NewTaskMessage(taskType string, payload []byte) *base.TaskMessage {
return NewTaskMessageWithQueue(taskType, payload, base.DefaultQueueName)
}
// NewTaskMessageWithQueue returns a new instance of TaskMessage given a
// task type, payload and queue name.
func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage {
func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *base.TaskMessage {
return &base.TaskMessage{
ID: uuid.New(),
Type: taskType,

View File

@ -6,12 +6,8 @@
package base
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"sync"
"time"
@ -125,32 +121,8 @@ func SchedulerHistoryKey(entryID string) string {
}
// UniqueKey returns a redis key with the given type, payload, and queue name.
func UniqueKey(qname, tasktype string, payload map[string]interface{}) string {
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload))
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
func UniqueKey(qname, tasktype string, payload []byte) string {
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload))
}
// TaskMessage is the internal representation of a task with additional metadata fields.
@ -160,7 +132,7 @@ type TaskMessage struct {
Type string
// Payload holds data needed to process the task.
Payload map[string]interface{}
Payload []byte
// ID is a unique identifier for each task.
ID uuid.UUID
@ -203,13 +175,9 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
if msg == nil {
return nil, fmt.Errorf("cannot encode nil message")
}
payload, err := json.Marshal(msg.Payload)
if err != nil {
return nil, err
}
return proto.Marshal(&pb.TaskMessage{
Type: msg.Type,
Payload: payload,
Payload: msg.Payload,
Id: msg.ID.String(),
Queue: msg.Queue,
Retry: int32(msg.Retry),
@ -227,13 +195,9 @@ func DecodeMessage(data []byte) (*TaskMessage, error) {
if err := proto.Unmarshal(data, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetPayload())
if err != nil {
return nil, err
}
return &TaskMessage{
Type: pbmsg.GetType(),
Payload: payload,
Payload: pbmsg.GetPayload(),
ID: uuid.MustParse(pbmsg.GetId()),
Queue: pbmsg.GetQueue(),
Retry: int(pbmsg.GetRetry()),
@ -383,7 +347,7 @@ type WorkerInfo struct {
ServerID string
ID string
Type string
Payload map[string]interface{}
Payload []byte
Queue string
Started time.Time
Deadline time.Time
@ -394,10 +358,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
if info == nil {
return nil, fmt.Errorf("cannot encode nil worker info")
}
payload, err := json.Marshal(info.Payload)
if err != nil {
return nil, err
}
startTime, err := ptypes.TimestampProto(info.Started)
if err != nil {
return nil, err
@ -412,33 +372,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
ServerId: info.ServerID,
TaskId: info.ID,
TaskType: info.Type,
TaskPayload: payload,
TaskPayload: info.Payload,
Queue: info.Queue,
StartTime: startTime,
Deadline: deadline,
})
}
func decodePayload(b []byte) (map[string]interface{}, error) {
d := json.NewDecoder(bytes.NewReader(b))
d.UseNumber()
payload := make(map[string]interface{})
if err := d.Decode(&payload); err != nil {
return nil, err
}
return payload, nil
}
// DecodeWorkerInfo decodes the given bytes into WorkerInfo.
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
var pbmsg pb.WorkerInfo
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
if err != nil {
return nil, err
@ -453,7 +399,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
ServerID: pbmsg.GetServerId(),
ID: pbmsg.GetTaskId(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Payload: pbmsg.GetTaskPayload(),
Queue: pbmsg.GetQueue(),
Started: startTime,
Deadline: deadline,
@ -472,7 +418,7 @@ type SchedulerEntry struct {
Type string
// Payload is the payload of the periodic task.
Payload map[string]interface{}
Payload []byte
// Opts is the options for the periodic task.
Opts []string
@ -490,10 +436,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
if entry == nil {
return nil, fmt.Errorf("cannot encode nil scheduler entry")
}
payload, err := json.Marshal(entry.Payload)
if err != nil {
return nil, err
}
next, err := ptypes.TimestampProto(entry.Next)
if err != nil {
return nil, err
@ -506,7 +448,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
Id: entry.ID,
Spec: entry.Spec,
TaskType: entry.Type,
TaskPayload: payload,
TaskPayload: entry.Payload,
EnqueueOptions: entry.Opts,
NextEnqueueTime: next,
PrevEnqueueTime: prev,
@ -519,10 +461,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
if err := proto.Unmarshal(b, &pbmsg); err != nil {
return nil, err
}
payload, err := decodePayload(pbmsg.GetTaskPayload())
if err != nil {
return nil, err
}
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
if err != nil {
return nil, err
@ -535,7 +473,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
ID: pbmsg.GetId(),
Spec: pbmsg.GetSpec(),
Type: pbmsg.GetTaskType(),
Payload: payload,
Payload: pbmsg.GetTaskPayload(),
Opts: pbmsg.GetEnqueueOptions(),
Next: next,
Prev: prev,

View File

@ -1,230 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"encoding/json"
"fmt"
"time"
"github.com/spf13/cast"
)
// Payload holds arbitrary data needed for task execution.
type Payload struct {
data map[string]interface{}
}
type errKeyNotFound struct {
key string
}
func (e *errKeyNotFound) Error() string {
return fmt.Sprintf("key %q does not exist", e.key)
}
// Has reports whether key exists.
func (p Payload) Has(key string) bool {
_, ok := p.data[key]
return ok
}
func toInt(v interface{}) (int, error) {
switch v := v.(type) {
case json.Number:
val, err := v.Int64()
if err != nil {
return 0, err
}
return int(val), nil
default:
return cast.ToIntE(v)
}
}
// String returns a string representation of payload data.
func (p Payload) String() string {
return fmt.Sprint(p.data)
}
// MarshalJSON returns the JSON encoding of payload data.
func (p Payload) MarshalJSON() ([]byte, error) {
return json.Marshal(p.data)
}
// GetString returns a string value if a string type is associated with
// the key, otherwise reports an error.
func (p Payload) GetString(key string) (string, error) {
v, ok := p.data[key]
if !ok {
return "", &errKeyNotFound{key}
}
return cast.ToStringE(v)
}
// GetInt returns an int value if a numeric type is associated with
// the key, otherwise reports an error.
func (p Payload) GetInt(key string) (int, error) {
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}
return toInt(v)
}
// GetFloat64 returns a float64 value if a numeric type is associated with
// the key, otherwise reports an error.
func (p Payload) GetFloat64(key string) (float64, error) {
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}
switch v := v.(type) {
case json.Number:
return v.Float64()
default:
return cast.ToFloat64E(v)
}
}
// GetBool returns a boolean value if a boolean type is associated with
// the key, otherwise reports an error.
func (p Payload) GetBool(key string) (bool, error) {
v, ok := p.data[key]
if !ok {
return false, &errKeyNotFound{key}
}
return cast.ToBoolE(v)
}
// GetStringSlice returns a slice of strings if a string slice type is associated with
// the key, otherwise reports an error.
func (p Payload) GetStringSlice(key string) ([]string, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
return cast.ToStringSliceE(v)
}
// GetIntSlice returns a slice of ints if a int slice type is associated with
// the key, otherwise reports an error.
func (p Payload) GetIntSlice(key string) ([]int, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
switch v := v.(type) {
case []interface{}:
var res []int
for _, elem := range v {
val, err := toInt(elem)
if err != nil {
return nil, err
}
res = append(res, int(val))
}
return res, nil
default:
return cast.ToIntSliceE(v)
}
}
// GetStringMap returns a map of string to empty interface
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMap(key string) (map[string]interface{}, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
return cast.ToStringMapE(v)
}
// GetStringMapString returns a map of string to string
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapString(key string) (map[string]string, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
return cast.ToStringMapStringE(v)
}
// GetStringMapStringSlice returns a map of string to string slice
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
return cast.ToStringMapStringSliceE(v)
}
// GetStringMapInt returns a map of string to int
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapInt(key string) (map[string]int, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
switch v := v.(type) {
case map[string]interface{}:
res := make(map[string]int)
for key, val := range v {
ival, err := toInt(val)
if err != nil {
return nil, err
}
res[key] = ival
}
return res, nil
default:
return cast.ToStringMapIntE(v)
}
}
// GetStringMapBool returns a map of string to boolean
// if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetStringMapBool(key string) (map[string]bool, error) {
v, ok := p.data[key]
if !ok {
return nil, &errKeyNotFound{key}
}
return cast.ToStringMapBoolE(v)
}
// GetTime returns a time value if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetTime(key string) (time.Time, error) {
v, ok := p.data[key]
if !ok {
return time.Time{}, &errKeyNotFound{key}
}
return cast.ToTimeE(v)
}
// GetDuration returns a duration value if a correct map type is associated with the key,
// otherwise reports an error.
func (p Payload) GetDuration(key string) (time.Duration, error) {
v, ok := p.data[key]
if !ok {
return 0, &errKeyNotFound{key}
}
switch v := v.(type) {
case json.Number:
val, err := v.Int64()
if err != nil {
return 0, err
}
return time.Duration(val), nil
default:
return cast.ToDurationE(v)
}
}

View File

@ -1,675 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"encoding/json"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
)
type payloadTest struct {
data map[string]interface{}
key string
nonkey string
}
func TestPayloadString(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"name": "gopher"},
key: "name",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetString(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetString(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetString(tc.nonkey)
if err == nil || got != "" {
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
tc.key, got, err)
}
}
}
func TestPayloadInt(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"user_id": 42},
key: "user_id",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetInt(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetInt(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetInt(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
tc.key, got, err)
}
}
}
func TestPayloadFloat64(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"pi": 3.14},
key: "pi",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetFloat64(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetFloat64(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetFloat64(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetFloat64(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetFloat64(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetFloat64(%q) = %v, %v; want 0, error",
tc.key, got, err)
}
}
}
func TestPayloadBool(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"enabled": true},
key: "enabled",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetBool(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetBool(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetBool(tc.nonkey)
if err == nil || got != false {
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
tc.key, got, err)
}
}
}
func TestPayloadStringSlice(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"names": []string{"luke", "rey", "anakin"}},
key: "names",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadIntSlice(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"nums": []int{9, 8, 7}},
key: "nums",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetIntSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetIntSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetIntSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMap(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"user": map[string]interface{}{"name": "Jon Doe", "score": 2.2}},
key: "user",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMap(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMap(tc.key)
ignoreOpt := cmpopts.IgnoreMapEntries(func(key string, val interface{}) bool {
switch val.(type) {
case json.Number:
return true
default:
return false
}
})
diff = cmp.Diff(got, tc.data[tc.key], ignoreOpt)
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMap(%q) = %v, %v, want %v, nil;(-want,+got)\n%s",
tc.key, got, err, tc.data[tc.key], diff)
}
// access non-existent key.
got, err = payload.GetStringMap(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapString(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"address": map[string]string{"line": "123 Main St", "city": "San Francisco", "state": "CA"}},
key: "address",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapString(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapString(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapString(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapStringSlice(t *testing.T) {
favs := map[string][]string{
"movies": {"forrest gump", "star wars"},
"tv_shows": {"game of thrones", "HIMYM", "breaking bad"},
}
tests := []payloadTest{
{
data: map[string]interface{}{"favorites": favs},
key: "favorites",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapStringSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapStringSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapStringSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapInt(t *testing.T) {
counter := map[string]int{
"a": 1,
"b": 101,
"c": 42,
}
tests := []payloadTest{
{
data: map[string]interface{}{"counts": counter},
key: "counts",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapInt(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapInt(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapInt(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapBool(t *testing.T) {
features := map[string]bool{
"A": false,
"B": true,
"C": true,
}
tests := []payloadTest{
{
data: map[string]interface{}{"features": features},
key: "features",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapBool(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapBool(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapBool(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadTime(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"current": time.Now()},
key: "current",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetTime(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetTime(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetTime(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetTime(tc.nonkey)
if err == nil || !got.IsZero() {
t.Errorf("Payload.GetTime(%q) = %v, %v; want %v, error",
tc.key, got, err, time.Time{})
}
}
}
func TestPayloadDuration(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"duration": 15 * time.Minute},
key: "duration",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetDuration(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
encoded, err := base.EncodeMessage(in)
if err != nil {
t.Fatal(err)
}
out, err := base.DecodeMessage(encoded)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetDuration(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetDuration(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetDuration(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetDuration(%q) = %v, %v; want %v, error",
tc.key, got, err, time.Duration(0))
}
}
}
func TestPayloadHas(t *testing.T) {
payload := Payload{map[string]interface{}{
"user_id": 123,
}}
if !payload.Has("user_id") {
t.Errorf("Payload.Has(%q) = false, want true", "user_id")
}
if payload.Has("name") {
t.Errorf("Payload.Has(%q) = true, want false", "name")
}
}
func TestPayloadDebuggingStrings(t *testing.T) {
data := map[string]interface{}{
"foo": 123,
"bar": "hello",
"baz": false,
}
payload := Payload{data: data}
if payload.String() != fmt.Sprint(data) {
t.Errorf("Payload.String() = %q, want %q",
payload.String(), fmt.Sprint(data))
}
got, err := payload.MarshalJSON()
if err != nil {
t.Fatal(err)
}
want, err := json.Marshal(data)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("Payload.MarhsalJSON() = %s, want %s; (-want,+got)\n%s",
got, want, diff)
}
}

View File

@ -240,8 +240,8 @@ func (s *Scheduler) beat() {
e := &base.SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
Type: job.task.Type,
Payload: job.task.Payload.data,
Type: job.task.Type(),
Payload: job.task.Payload(),
Opts: stringifyOptions(job.opts),
Next: entry.Next,
Prev: entry.Prev,

View File

@ -62,7 +62,7 @@ func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
h, pattern = mux.match(t.Type)
h, pattern = mux.match(t.Type())
if h == nil {
h, pattern = NotFoundHandler(), ""
}