diff --git a/CHANGELOG.md b/CHANGELOG.md index d70ed3b..feb0a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- NewTask constructor + +### Changed + +- Task type is now immutable (i.e., Payload is read-only) + ## [0.1.0] - 2020-01-04 ### Added diff --git a/README.md b/README.md index 02f7343..40a3c72 100644 --- a/README.md +++ b/README.md @@ -58,28 +58,23 @@ func main() { } client := asynq.NewClient(r) - t1 := asynq.Task{ - Type: "send_welcome_email", - Payload: map[string]interface{}{ - "recipient_id": 1234, - }, - } + // create a task with typename and payload. + t1 := asynq.NewTask( + "send_welcome_email", + map[string]interface{}{"user_id": 42}) - t2 := asynq.Task{ - Type: "send_reminder_email", - Payload: map[string]interface{}{ - "recipient_id": 1234, - }, - } + t2 := asynq.NewTask( + "send_reminder_email", + map[string]interface{}{"user_id": 42}) // process the task immediately. - err := client.Schedule(&t1, time.Now()) + err := client.Schedule(t1, time.Now()) // process the task 24 hours later. - err = client.Schedule(&t2, time.Now().Add(24 * time.Hour)) + err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) // specify the max number of retry (default: 25) - err = client.Schedule(&t1, time.Now(), asynq.MaxRetry(1)) + err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1)) } ``` @@ -120,7 +115,7 @@ The simplest way to implement a handler is to define a function with the same si func handler(t *asynq.Task) error { switch t.Type { case "send_welcome_email": - id, err := t.Payload.GetInt("recipient_id") + id, err := t.Payload.GetInt("user_id") if err != nil { return err } diff --git a/asynq.go b/asynq.go index b9da48d..51fcceb 100644 --- a/asynq.go +++ b/asynq.go @@ -11,9 +11,20 @@ TODOs: // Task represents a task to be performed. type Task struct { - // Type indicates the kind of the task to be performed. + // Type indicates the type of task to be performed. Type string // Payload holds data needed to process the task. Payload Payload } + +// NewTask returns a new instance of a task given a task type and payload. +// +// Since payload data gets serialized to JSON, the payload values must be +// composed of JSON supported data types. +func NewTask(typename string, payload map[string]interface{}) *Task { + return &Task{ + Type: typename, + Payload: Payload{payload}, + } +} diff --git a/background.go b/background.go index 572718b..204b59d 100644 --- a/background.go +++ b/background.go @@ -52,7 +52,7 @@ type Config struct { // // n is the number of times the task has been retried. // e is the error returned by the task handler. - // t is the task in question. t is read-only, the function should not mutate t. + // t is the task in question. RetryDelayFunc func(n int, e error, t *Task) time.Duration } @@ -91,9 +91,6 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { // // If ProcessTask return a non-nil error or panics, the task // will be retried after delay. -// -// Note: The argument task is ready only, ProcessTask should -// not mutate the task. type Handler interface { ProcessTask(*Task) error } diff --git a/background_test.go b/background_test.go index ef5f228..1432aa5 100644 --- a/background_test.go +++ b/background_test.go @@ -33,15 +33,9 @@ func TestBackground(t *testing.T) { bg.start(HandlerFunc(h)) - client.Schedule(&Task{ - Type: "send_email", - Payload: map[string]interface{}{"recipient_id": 123}, - }, time.Now()) + client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 123}), time.Now()) - client.Schedule(&Task{ - Type: "send_email", - Payload: map[string]interface{}{"recipient_id": 456}, - }, time.Now().Add(time.Hour)) + client.Schedule(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), time.Now().Add(time.Hour)) bg.stop() } diff --git a/benchmark_test.go b/benchmark_test.go index 61525f9..27f5899 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -28,8 +28,8 @@ func BenchmarkEndToEndSimple(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := Task{Type: fmt.Sprintf("task%d", i), Payload: Payload{"data": i}} - client.Schedule(&t, time.Now()) + t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now()) } var wg sync.WaitGroup @@ -65,12 +65,12 @@ func BenchmarkEndToEnd(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := Task{Type: fmt.Sprintf("task%d", i), Payload: Payload{"data": i}} - client.Schedule(&t, time.Now()) + t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now()) } for i := 0; i < count; i++ { - t := Task{Type: fmt.Sprintf("scheduled%d", i), Payload: Payload{"data": i}} - client.Schedule(&t, time.Now().Add(time.Second)) + t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now().Add(time.Second)) } var wg sync.WaitGroup diff --git a/client.go b/client.go index e282b58..64e92e9 100644 --- a/client.go +++ b/client.go @@ -82,7 +82,7 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error msg := &base.TaskMessage{ ID: xid.New(), Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Queue: "default", Retry: opt.retry, } diff --git a/client_test.go b/client_test.go index 0938cc9..32aebc6 100644 --- a/client_test.go +++ b/client_test.go @@ -17,7 +17,7 @@ func TestClient(t *testing.T) { r := setup(t) client := NewClient(r) - task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}} + task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) tests := []struct { desc string @@ -35,7 +35,7 @@ func TestClient(t *testing.T) { wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", }, @@ -52,7 +52,7 @@ func TestClient(t *testing.T) { { Msg: &base.TaskMessage{ Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Retry: defaultMaxRetry, Queue: "default", }, @@ -70,7 +70,7 @@ func TestClient(t *testing.T) { wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Retry: 3, Queue: "default", }, @@ -87,7 +87,7 @@ func TestClient(t *testing.T) { wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Retry: 0, // Retry count should be set to zero Queue: "default", }, @@ -105,7 +105,7 @@ func TestClient(t *testing.T) { wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ Type: task.Type, - Payload: task.Payload, + Payload: task.Payload.data, Retry: 10, // Last option takes precedence Queue: "default", }, diff --git a/doc.go b/doc.go index 57725dc..75bc785 100644 --- a/doc.go +++ b/doc.go @@ -9,12 +9,11 @@ The Client is used to register a task to be processed at the specified time. client := asynq.NewClient(redis) - t := asynq.Task{ - Type: "send_email", - Payload: map[string]interface{}{"user_id": 42}, - } + t := asynq.NewTask( + "send_email", + map[string]interface{}{"user_id": 42}) - err := client.Schedule(&t, time.Now().Add(time.Minute)) + err := client.Schedule(t, time.Now().Add(time.Minute)) The Background is used to run the background task processing with a given handler. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 126d8c9..e69e6b7 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -148,64 +148,6 @@ func TestDone(t *testing.T) { } } -// Note: User should not mutate task payload in Handler -// However, we should handle even if the user mutates the task -// in Handler. This test case is to make sure that we remove task -// from in-progress queue when we call Done for the task. -func TestDoneWithMutatedTask(t *testing.T) { - r := setup(t) - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) - t2 := h.NewTaskMessage("export_csv", map[string]interface{}{"subjct": "hola"}) - - tests := []struct { - inProgress []*base.TaskMessage // initial state of the in-progress list - target *base.TaskMessage // task to remove - wantInProgress []*base.TaskMessage // final state of the in-progress list - }{ - { - inProgress: []*base.TaskMessage{t1, t2}, - target: t1, - wantInProgress: []*base.TaskMessage{t2}, - }, - { - inProgress: []*base.TaskMessage{t1}, - target: t1, - wantInProgress: []*base.TaskMessage{}, - }, - } - - for _, tc := range tests { - h.FlushDB(t, r.client) // clean up db before each test case - h.SeedInProgressQueue(t, r.client, tc.inProgress) - - // Mutate payload map! - tc.target.Payload["newkey"] = 123 - - err := r.Done(tc.target) - if err != nil { - t.Errorf("(*RDB).Done(task) = %v, want nil", err) - continue - } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) - continue - } - - processedKey := base.ProcessedKey(time.Now()) - gotProcessed := r.client.Get(processedKey).Val() - if gotProcessed != "1" { - t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) - } - - gotTTL := r.client.TTL(processedKey).Val() - if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) - } - } -} - func TestRequeue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) @@ -384,104 +326,6 @@ func TestRetry(t *testing.T) { } } -func TestRetryWithMutatedTask(t *testing.T) { - r := setup(t) - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) - t2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"}) - t3 := h.NewTaskMessage("reindex", map[string]interface{}{}) - t1.Retried = 10 - errMsg := "SMTP server is not responding" - t1AfterRetry := &base.TaskMessage{ - ID: t1.ID, - Type: t1.Type, - Payload: t1.Payload, - Queue: t1.Queue, - Retry: t1.Retry, - Retried: t1.Retried + 1, - ErrorMsg: errMsg, - } - now := time.Now() - - tests := []struct { - inProgress []*base.TaskMessage - retry []h.ZSetEntry - msg *base.TaskMessage - processAt time.Time - errMsg string - wantInProgress []*base.TaskMessage - wantRetry []h.ZSetEntry - }{ - { - inProgress: []*base.TaskMessage{t1, t2}, - retry: []h.ZSetEntry{ - { - Msg: t3, - Score: now.Add(time.Minute).Unix(), - }, - }, - msg: t1, - processAt: now.Add(5 * time.Minute), - errMsg: errMsg, - wantInProgress: []*base.TaskMessage{t2}, - wantRetry: []h.ZSetEntry{ - { - Msg: t1AfterRetry, - Score: now.Add(5 * time.Minute).Unix(), - }, - { - Msg: t3, - Score: now.Add(time.Minute).Unix(), - }, - }, - }, - } - - for _, tc := range tests { - h.FlushDB(t, r.client) - h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedRetryQueue(t, r.client, tc.retry) - - // Mutate paylod map! - tc.msg.Payload["newkey"] = "newvalue" - - err := r.Retry(tc.msg, tc.processAt, tc.errMsg) - if err != nil { - t.Errorf("(*RDB).Retry = %v, want nil", err) - continue - } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff) - } - - gotRetry := h.GetRetryEntries(t, r.client) - if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) - } - - processedKey := base.ProcessedKey(time.Now()) - gotProcessed := r.client.Get(processedKey).Val() - if gotProcessed != "1" { - t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) - } - gotTTL := r.client.TTL(processedKey).Val() - if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) - } - - failureKey := base.FailureKey(time.Now()) - gotFailure := r.client.Get(failureKey).Val() - if gotFailure != "1" { - t.Errorf("GET %q = %q, want 1", failureKey, gotFailure) - } - gotTTL = r.client.TTL(processedKey).Val() - if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL) - } - } -} - func TestKill(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) @@ -585,112 +429,6 @@ func TestKill(t *testing.T) { } } -func TestKillWithMutatedTask(t *testing.T) { - r := setup(t) - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) - t2 := h.NewTaskMessage("reindex", map[string]interface{}{}) - t3 := h.NewTaskMessage("generate_csv", map[string]interface{}{"path": "some/path/to/img"}) - errMsg := "SMTP server not responding" - t1AfterKill := &base.TaskMessage{ - ID: t1.ID, - Type: t1.Type, - Payload: t1.Payload, - Queue: t1.Queue, - Retry: t1.Retry, - Retried: t1.Retried, - ErrorMsg: errMsg, - } - now := time.Now() - - // TODO(hibiken): add test cases for trimming - tests := []struct { - inProgress []*base.TaskMessage - dead []h.ZSetEntry - target *base.TaskMessage // task to kill - wantInProgress []*base.TaskMessage - wantDead []h.ZSetEntry - }{ - { - inProgress: []*base.TaskMessage{t1, t2}, - dead: []h.ZSetEntry{ - { - Msg: t3, - Score: now.Add(-time.Hour).Unix(), - }, - }, - target: t1, - wantInProgress: []*base.TaskMessage{t2}, - wantDead: []h.ZSetEntry{ - { - Msg: t1AfterKill, - Score: now.Unix(), - }, - { - Msg: t3, - Score: now.Add(-time.Hour).Unix(), - }, - }, - }, - { - inProgress: []*base.TaskMessage{t1, t2, t3}, - dead: []h.ZSetEntry{}, - target: t1, - wantInProgress: []*base.TaskMessage{t2, t3}, - wantDead: []h.ZSetEntry{ - { - Msg: t1AfterKill, - Score: now.Unix(), - }, - }, - }, - } - - for _, tc := range tests { - h.FlushDB(t, r.client) // clean up db before each test case - h.SeedInProgressQueue(t, r.client, tc.inProgress) - h.SeedDeadQueue(t, r.client, tc.dead) - - // Mutate payload map! - tc.target.Payload["newkey"] = "newvalue" - - err := r.Kill(tc.target, errMsg) - if err != nil { - t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err) - continue - } - - gotInProgress := h.GetInProgressMessages(t, r.client) - if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressQueue, diff) - } - - gotDead := h.GetDeadEntries(t, r.client) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) - } - - processedKey := base.ProcessedKey(time.Now()) - gotProcessed := r.client.Get(processedKey).Val() - if gotProcessed != "1" { - t.Errorf("GET %q = %q, want 1", processedKey, gotProcessed) - } - gotTTL := r.client.TTL(processedKey).Val() - if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) - } - - failureKey := base.FailureKey(time.Now()) - gotFailure := r.client.Get(failureKey).Val() - if gotFailure != "1" { - t.Errorf("GET %q = %q, want 1", failureKey, gotFailure) - } - gotTTL = r.client.TTL(processedKey).Val() - if gotTTL > statsTTL { - t.Errorf("TTL %q = %v, want less than or equal to %v", failureKey, gotTTL, statsTTL) - } - } -} - func TestRestoreUnfinished(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) diff --git a/payload.go b/payload.go index 15a29bf..461bfb4 100644 --- a/payload.go +++ b/payload.go @@ -12,8 +12,9 @@ import ( ) // Payload is an arbitrary data needed for task execution. -// The values have to be JSON serializable. -type Payload map[string]interface{} +type Payload struct { + data map[string]interface{} +} type errKeyNotFound struct { key string @@ -25,14 +26,14 @@ func (e *errKeyNotFound) Error() string { // Has reports whether key exists. func (p Payload) Has(key string) bool { - _, ok := p[key] + _, ok := p.data[key] return ok } // GetString returns a string value if a string type is associated with // the key, otherwise reports an error. func (p Payload) GetString(key string) (string, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return "", &errKeyNotFound{key} } @@ -42,7 +43,7 @@ func (p Payload) GetString(key string) (string, error) { // GetInt returns an int value if a numeric type is associated with // the key, otherwise reports an error. func (p Payload) GetInt(key string) (int, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return 0, &errKeyNotFound{key} } @@ -52,7 +53,7 @@ func (p Payload) GetInt(key string) (int, error) { // GetFloat64 returns a float64 value if a numeric type is associated with // the key, otherwise reports an error. func (p Payload) GetFloat64(key string) (float64, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return 0, &errKeyNotFound{key} } @@ -62,7 +63,7 @@ func (p Payload) GetFloat64(key string) (float64, error) { // GetBool returns a boolean value if a boolean type is associated with // the key, otherwise reports an error. func (p Payload) GetBool(key string) (bool, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return false, &errKeyNotFound{key} } @@ -72,7 +73,7 @@ func (p Payload) GetBool(key string) (bool, error) { // GetStringSlice returns a slice of strings if a string slice type is associated with // the key, otherwise reports an error. func (p Payload) GetStringSlice(key string) ([]string, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -82,7 +83,7 @@ func (p Payload) GetStringSlice(key string) ([]string, error) { // GetIntSlice returns a slice of ints if a int slice type is associated with // the key, otherwise reports an error. func (p Payload) GetIntSlice(key string) ([]int, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -93,7 +94,7 @@ func (p Payload) GetIntSlice(key string) ([]int, error) { // if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetStringMap(key string) (map[string]interface{}, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -104,7 +105,7 @@ func (p Payload) GetStringMap(key string) (map[string]interface{}, error) { // if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetStringMapString(key string) (map[string]string, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -115,7 +116,7 @@ func (p Payload) GetStringMapString(key string) (map[string]string, error) { // if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -126,7 +127,7 @@ func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error // if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetStringMapInt(key string) (map[string]int, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -137,7 +138,7 @@ func (p Payload) GetStringMapInt(key string) (map[string]int, error) { // if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetStringMapBool(key string) (map[string]bool, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return nil, &errKeyNotFound{key} } @@ -147,7 +148,7 @@ func (p Payload) GetStringMapBool(key string) (map[string]bool, error) { // GetTime returns a time value if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetTime(key string) (time.Time, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return time.Time{}, &errKeyNotFound{key} } @@ -157,7 +158,7 @@ func (p Payload) GetTime(key string) (time.Time, error) { // GetDuration returns a duration value if a correct map type is associated with the key, // otherwise reports an error. func (p Payload) GetDuration(key string) (time.Duration, error) { - v, ok := p[key] + v, ok := p.data[key] if !ok { return 0, &errKeyNotFound{key} } diff --git a/payload_test.go b/payload_test.go index 2631844..a59b644 100644 --- a/payload_test.go +++ b/payload_test.go @@ -10,6 +10,8 @@ import ( "time" "github.com/google/go-cmp/cmp" + h "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq/internal/base" ) func TestPayloadGet(t *testing.T) { @@ -34,7 +36,7 @@ func TestPayloadGet(t *testing.T) { now := time.Now() duration := 15 * time.Minute - payload := Payload{ + data := map[string]interface{}{ "greeting": "Hello", "user_id": 9876, "pi": 3.1415, @@ -49,6 +51,7 @@ func TestPayloadGet(t *testing.T) { "timestamp": now, "duration": duration, } + payload := Payload{data} gotStr, err := payload.GetString("greeting") if gotStr != "Hello" || err != nil { @@ -151,7 +154,7 @@ func TestPayloadGetWithMarshaling(t *testing.T) { now := time.Now() duration := 15 * time.Minute - in := Payload{ + in := Payload{map[string]interface{}{ "subject": "Hello", "recipient_id": 9876, "pi": 3.14, @@ -165,18 +168,19 @@ func TestPayloadGetWithMarshaling(t *testing.T) { "features": features, "timestamp": now, "duration": duration, - } - - // encode and then decode - data, err := json.Marshal(in) + }} + // encode and then decode task messsage + inMsg := h.NewTaskMessage("testing", in.data) + data, err := json.Marshal(inMsg) if err != nil { t.Fatal(err) } - var out Payload - err = json.Unmarshal(data, &out) + var outMsg base.TaskMessage + err = json.Unmarshal(data, &outMsg) if err != nil { t.Fatal(err) } + out := Payload{outMsg.Payload} gotStr, err := out.GetString("subject") if gotStr != "Hello" || err != nil { @@ -258,9 +262,9 @@ func TestPayloadGetWithMarshaling(t *testing.T) { } func TestPayloadHas(t *testing.T) { - payload := Payload{ + payload := Payload{map[string]interface{}{ "user_id": 123, - } + }} if !payload.Has("user_id") { t.Errorf("Payload.Has(%q) = false, want true", "user_id") diff --git a/processor.go b/processor.go index b45f756..7f408c0 100644 --- a/processor.go +++ b/processor.go @@ -126,7 +126,7 @@ func (p *processor) exec() { defer func() { <-p.sema /* release token */ }() resCh := make(chan error, 1) - task := &Task{Type: msg.Type, Payload: msg.Payload} + task := NewTask(msg.Type, msg.Payload) go func() { resCh <- perform(p.handler, task) }() @@ -182,7 +182,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { } func (p *processor) retry(msg *base.TaskMessage, e error) { - d := p.retryDelayFunc(msg.Retried, e, &Task{Type: msg.Type, Payload: msg.Payload}) + d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(d) err := p.rdb.Retry(msg, retryAt, e.Error()) if err != nil { diff --git a/processor_test.go b/processor_test.go index 5063bd5..79ff0cc 100644 --- a/processor_test.go +++ b/processor_test.go @@ -25,10 +25,10 @@ func TestProcessorSuccess(t *testing.T) { m3 := h.NewTaskMessage("reindex", nil) m4 := h.NewTaskMessage("sync", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} - t4 := &Task{Type: m4.Type, Payload: m4.Payload} + t1 := NewTask(m1.Type, m1.Payload) + t2 := NewTask(m2.Type, m2.Payload) + t3 := NewTask(m3.Type, m3.Payload) + t4 := NewTask(m4.Type, m4.Payload) tests := []struct { enqueued []*base.TaskMessage // initial default queue state @@ -78,7 +78,7 @@ func TestProcessorSuccess(t *testing.T) { time.Sleep(tc.wait) p.terminate() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } @@ -190,7 +190,7 @@ func TestPerform(t *testing.T) { handler: func(t *Task) error { return nil }, - task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}}, + task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), wantErr: false, }, { @@ -198,7 +198,7 @@ func TestPerform(t *testing.T) { handler: func(t *Task) error { return fmt.Errorf("something went wrong") }, - task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}}, + task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), wantErr: true, }, { @@ -206,7 +206,7 @@ func TestPerform(t *testing.T) { handler: func(t *Task) error { panic("something went terribly wrong") }, - task: &Task{Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "some/img/path"}}, + task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), wantErr: true, }, }