diff --git a/CHANGELOG.md b/CHANGELOG.md index 47e6f06..d53b35f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- NewTask function now takes array of bytes as payload. +- Task `Type` and `Payload` should be accessed by a method call. - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script diff --git a/README.md b/README.md index cbc5882..1f7c064 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Asynq is a Go library for queueing tasks and processing them asynchronously with Highlevel overview of how Asynq works: -- Client puts task on a queue -- Server pulls task off queues and starts a worker goroutine for each task +- Client puts tasks on a queue +- Server pulls tasks off queues and starts a worker goroutine for each task - Tasks are processed concurrently by multiple workers Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling. @@ -77,19 +77,36 @@ const ( TypeImageResize = "image:resize" ) +type EmailDeliveryPayload struct { + UserID int + TemplateID string +} + +type ImageResizePayload struct { + SourceURL string +} + //---------------------------------------------- // Write a function NewXXXTask to create a task. // A task consists of a type and a payload. //---------------------------------------------- -func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { - payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} - return asynq.NewTask(TypeEmailDelivery, payload) +func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) { + payload := EmailDeliveryPayload{UserID: userID, TemplateID: templID} + bytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return asynq.NewTask(TypeEmailDelivery, bytes), nil } -func NewImageResizeTask(src string) *asynq.Task { - payload := map[string]interface{}{"src": src} - return asynq.NewTask(TypeImageResize, payload) +func NewImageResizeTask(src string) (*asynq.Task, error) { + payload := ImageResizePayload{SourceURL: src} + bytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return asynq.NewTask(TypeImageResize, bytes), nil } //--------------------------------------------------------------- @@ -101,15 +118,11 @@ func NewImageResizeTask(src string) *asynq.Task { //--------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { - userID, err := t.Payload.GetInt("user_id") - if err != nil { - return err + var p EmailDeliveryPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - tmplID, err := t.Payload.GetString("template_id") - if err != nil { - return err - } - fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID) + log.Printf("Sending Email to User: user_id = %d, template_id = %s\n", p.UserID, p.TemplateID) // Email delivery code ... return nil } @@ -120,11 +133,11 @@ type ImageProcessor struct { } func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - src, err := t.Payload.GetString("src") - if err != nil { - return err + var p ImageResizePayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - fmt.Printf("Resize image: src = %s\n", src) + log.Printf("Resizing image: src = %s\n", p.SourceURL) // Image resizing code ... return nil } @@ -160,10 +173,13 @@ func main() { // Use (*Client).Enqueue method. // ------------------------------------------------------ - t := tasks.NewEmailDeliveryTask(42, "some:template:id") + t, err := tasks.NewEmailDeliveryTask(42, "some:template:id") + if err != nil { + log.Fatalf("could not create task: %v", err) + } res, err := c.Enqueue(t) if err != nil { - log.Fatal("could not enqueue task: %v", err) + log.Fatalf("could not enqueue task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) @@ -173,10 +189,9 @@ func main() { // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ - t = tasks.NewEmailDeliveryTask(42, "other:template:id") res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) if err != nil { - log.Fatal("could not schedule task: %v", err) + log.Fatalf("could not schedule task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) @@ -188,19 +203,21 @@ func main() { c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute)) - t = tasks.NewImageResizeTask("some/blobstore/path") + t, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg") + if err != nil { + log.Fatalf("could not create task: %v", err) + } res, err = c.Enqueue(t) if err != nil { - log.Fatal("could not enqueue task: %v", err) + log.Fatalf("could not enqueue task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) // --------------------------------------------------------------------------- // Example 4: Pass options to tune task processing behavior at enqueue time. - // Options passed at enqueue time override default ones, if any. + // Options passed at enqueue time override default ones. // --------------------------------------------------------------------------- - t = tasks.NewImageResizeTask("some/blobstore/path") res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) diff --git a/asynq.go b/asynq.go index 39e16cb..6a7a65f 100644 --- a/asynq.go +++ b/asynq.go @@ -17,20 +17,21 @@ import ( // Task represents a unit of work to be performed. type Task struct { - // Type indicates the type of task to be performed. - Type string + // typename indicates the type of task to be performed. + typename string - // Payload holds data needed to perform the task. - Payload Payload + // payload holds data needed to perform the task. + payload []byte } +func (t *Task) Type() string { return t.typename } +func (t *Task) Payload() []byte { return t.payload } + // NewTask returns a new Task given a type name and payload data. -// -// The payload values must be serializable. -func NewTask(typename string, payload map[string]interface{}) *Task { +func NewTask(typename string, payload []byte) *Task { return &Task{ - Type: typename, - Payload: Payload{payload}, + typename: typename, + payload: payload, } } diff --git a/asynq_test.go b/asynq_test.go index 2ca46d4..c763a05 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -85,7 +85,7 @@ func getRedisConnOpt(tb testing.TB) RedisConnOpt { var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { out := append([]*Task(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].Type < out[j].Type + return out[i].Type() < out[j].Type() }) return out }) diff --git a/benchmark_test.go b/benchmark_test.go index a4d7db2..b98ea34 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -6,12 +6,24 @@ package asynq import ( "context" + "encoding/json" "fmt" "sync" "testing" "time" + + h "github.com/hibiken/asynq/internal/asynqtest" ) +// Creates a new task of type "task" with payload {"data": n}. +func makeTask(n int) *Task { + b, err := json.Marshal(map[string]int{"data": n}) + if err != nil { + panic(err) + } + return NewTask(fmt.Sprintf("task%d", n), b) +} + // Simple E2E Benchmark testing with no scheduled tasks and retries. func BenchmarkEndToEndSimple(b *testing.B) { const count = 100000 @@ -29,8 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -70,14 +81,12 @@ func BenchmarkEndToEnd(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { + if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -86,13 +95,18 @@ func BenchmarkEndToEnd(b *testing.B) { var wg sync.WaitGroup wg.Add(count * 2) handler := func(ctx context.Context, t *Task) error { - n, err := t.Payload.GetInt("data") - if err != nil { + var p map[string]int + if err := json.Unmarshal(t.Payload(), &p); err != nil { b.Logf("internal error: %v", err) } + n, ok := p["data"] + if !ok { + n = 1 + b.Logf("internal error: could not get data from payload") + } retried, ok := GetRetryCount(ctx) if !ok { - b.Logf("internal error: %v", err) + b.Logf("internal error: could not get retry count from context") } // Fail 1% of tasks for the first attempt. if retried == 0 && n%100 == 0 { @@ -136,20 +150,17 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < highCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, Queue("high")); err != nil { + if _, err := client.Enqueue(makeTask(i), Queue("high")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < defaultCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < lowCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, Queue("low")); err != nil { + if _, err := client.Enqueue(makeTask(i), Queue("low")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -190,15 +201,13 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { }) // Enqueue 10,000 tasks. for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } // Schedule 10,000 tasks. for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { + if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -213,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.Log("Starting enqueueing") enqueued := 0 for enqueued < 100000 { - t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued}) + t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue diff --git a/client.go b/client.go index 9402099..dc1841c 100644 --- a/client.go +++ b/client.go @@ -176,7 +176,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v) func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Value() interface{} { return time.Duration(d) } - // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // // ErrDuplicateTask error only applies to tasks enqueued with a Unique option. @@ -305,7 +304,7 @@ func (c *Client) Close() error { // If no ProcessAt or ProcessIn options are passed, the task will be processed immediately. func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { c.mu.Lock() - if defaults, ok := c.opts[task.Type]; ok { + if defaults, ok := c.opts[task.Type()]; ok { opts = append(defaults, opts...) } c.mu.Unlock() @@ -327,12 +326,12 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { } var uniqueKey string if opt.uniqueTTL > 0 { - uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data) + uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) } msg := &base.TaskMessage{ ID: uuid.New(), - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Queue: opt.queue, Retry: opt.retry, Deadline: deadline.Unix(), diff --git a/client_test.go b/client_test.go index c3207ef..cc1dacc 100644 --- a/client_test.go +++ b/client_test.go @@ -20,7 +20,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) var ( now = time.Now() @@ -52,8 +52,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -85,8 +85,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { "default": { { Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -137,7 +137,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -163,8 +163,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 3, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -189,8 +189,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 0, // Retry count should be set to zero Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -216,8 +216,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 10, // Last option takes precedence Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -242,8 +242,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "custom": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "custom", Timeout: int64(defaultTimeout.Seconds()), @@ -268,8 +268,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "high": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "high", Timeout: int64(defaultTimeout.Seconds()), @@ -294,8 +294,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: 20, @@ -320,8 +320,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(noTimeout.Seconds()), @@ -347,8 +347,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: 20, @@ -390,7 +390,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -421,8 +421,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { "default": { { Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -448,8 +448,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -501,7 +501,7 @@ func TestClientEnqueueError(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) tests := []struct { desc string @@ -613,7 +613,7 @@ func TestClientDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) defer c.Close() - c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) + c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) gotRes, err := c.Enqueue(tc.task, tc.opts...) if err != nil { t.Fatal(err) @@ -650,7 +650,7 @@ func TestClientEnqueueUnique(t *testing.T) { ttl time.Duration }{ { - NewTask("email", map[string]interface{}{"user_id": 123}), + NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), time.Hour, }, } @@ -664,7 +664,7 @@ func TestClientEnqueueUnique(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) continue @@ -709,7 +709,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) @@ -755,7 +755,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) @@ -774,4 +774,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } } - diff --git a/doc.go b/doc.go index edd3865..06ede25 100644 --- a/doc.go +++ b/doc.go @@ -11,7 +11,7 @@ specify the connection using one of RedisConnOpt types. redisConnOpt = asynq.RedisClientOpt{ Addr: "127.0.0.1:6379", Password: "xxxxx", - DB: 3, + DB: 2, } The Client is used to enqueue a task. @@ -20,12 +20,16 @@ The Client is used to enqueue a task. client := asynq.NewClient(redisConnOpt) // Task is created with two parameters: its type and payload. - t := asynq.NewTask( - "send_email", - map[string]interface{}{"user_id": 42}) + // Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc. + b, err := json.Marshal(ExamplePayload{UserID: 42}) + if err != nil { + log.Fatal(err) + } + + task := asynq.NewTask("example", b) // Enqueue the task to be processed immediately. - res, err := client.Enqueue(t) + res, err := client.Enqueue(task) // Schedule the task to be processed after one minute. res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) @@ -52,10 +56,13 @@ Example of a type that implements the Handler interface. func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { switch task.Type { - case "send_email": - id, err := task.Payload.GetInt("user_id") - // send email - //... + case "example": + var data ExamplePayload + if err := json.Unmarshal(task.Payload(), &data); err != nil { + return err + } + // perform task with the data + default: return fmt.Errorf("unexpected task type %q", task.Type) } diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 6120ddf..0fedc9f 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -574,7 +574,7 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -632,7 +632,7 @@ func TestInspectorListActiveTasks(t *testing.T) { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -708,7 +708,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ScheduledTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -785,7 +785,7 @@ func TestInspectorListRetryTasks(t *testing.T) { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, RetryTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -861,7 +861,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ArchivedTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -922,7 +922,7 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) @@ -2598,7 +2598,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: map[string]interface{}{"fiz": "baz"}, + Payload: h.JSON(map[string]interface{}{"fiz": "baz"}), Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), @@ -2614,7 +2614,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { }, { Spec: "@every 20m", - Task: asynq.NewTask("bar", map[string]interface{}{"fiz": "baz"}), + Task: asynq.NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), @@ -2634,7 +2634,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { t.Errorf("SchedulerEntries() returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 7b1743d..66831ba 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -6,6 +6,7 @@ package asynqtest import ( + "encoding/json" "math" "sort" "testing" @@ -93,13 +94,13 @@ var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) [] var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. -func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { +func NewTaskMessage(taskType string, payload []byte) *base.TaskMessage { return NewTaskMessageWithQueue(taskType, payload, base.DefaultQueueName) } // NewTaskMessageWithQueue returns a new instance of TaskMessage given a // task type, payload and queue name. -func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage { +func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *base.TaskMessage { return &base.TaskMessage{ ID: uuid.New(), Type: taskType, @@ -111,6 +112,15 @@ func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qn } } +// JSON serializes the given key-value pairs into stream of bytes in JSON. +func JSON(kv map[string]interface{}) []byte { + b, err := json.Marshal(kv) + if err != nil { + panic(err) + } + return b +} + // TaskMessageAfterRetry returns an updated copy of t after retry. // It increments retry count and sets the error message. func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage { diff --git a/internal/base/base.go b/internal/base/base.go index 6ac3f42..2b040af 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,12 +6,8 @@ package base import ( - "bytes" "context" - "encoding/json" "fmt" - "sort" - "strings" "sync" "time" @@ -125,32 +121,8 @@ func SchedulerHistoryKey(entryID string) string { } // UniqueKey returns a redis key with the given type, payload, and queue name. -func UniqueKey(qname, tasktype string, payload map[string]interface{}) string { - return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload)) -} - -func serializePayload(payload map[string]interface{}) string { - if payload == nil { - return "nil" - } - type entry struct { - k string - v interface{} - } - var es []entry - for k, v := range payload { - es = append(es, entry{k, v}) - } - // sort entries by key - sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k }) - var b strings.Builder - for _, e := range es { - if b.Len() > 0 { - b.WriteString(",") - } - b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v)) - } - return b.String() +func UniqueKey(qname, tasktype string, payload []byte) string { + return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload)) } // TaskMessage is the internal representation of a task with additional metadata fields. @@ -160,7 +132,7 @@ type TaskMessage struct { Type string // Payload holds data needed to process the task. - Payload map[string]interface{} + Payload []byte // ID is a unique identifier for each task. ID uuid.UUID @@ -203,13 +175,9 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { if msg == nil { return nil, fmt.Errorf("cannot encode nil message") } - payload, err := json.Marshal(msg.Payload) - if err != nil { - return nil, err - } return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, - Payload: payload, + Payload: msg.Payload, Id: msg.ID.String(), Queue: msg.Queue, Retry: int32(msg.Retry), @@ -227,13 +195,9 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { if err := proto.Unmarshal(data, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetPayload()) - if err != nil { - return nil, err - } return &TaskMessage{ Type: pbmsg.GetType(), - Payload: payload, + Payload: pbmsg.GetPayload(), ID: uuid.MustParse(pbmsg.GetId()), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), @@ -383,7 +347,7 @@ type WorkerInfo struct { ServerID string ID string Type string - Payload map[string]interface{} + Payload []byte Queue string Started time.Time Deadline time.Time @@ -394,10 +358,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { if info == nil { return nil, fmt.Errorf("cannot encode nil worker info") } - payload, err := json.Marshal(info.Payload) - if err != nil { - return nil, err - } startTime, err := ptypes.TimestampProto(info.Started) if err != nil { return nil, err @@ -412,33 +372,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { ServerId: info.ServerID, TaskId: info.ID, TaskType: info.Type, - TaskPayload: payload, + TaskPayload: info.Payload, Queue: info.Queue, StartTime: startTime, Deadline: deadline, }) } -func decodePayload(b []byte) (map[string]interface{}, error) { - d := json.NewDecoder(bytes.NewReader(b)) - d.UseNumber() - payload := make(map[string]interface{}) - if err := d.Decode(&payload); err != nil { - return nil, err - } - return payload, nil -} - // DecodeWorkerInfo decodes the given bytes into WorkerInfo. func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { var pbmsg pb.WorkerInfo if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) if err != nil { return nil, err @@ -453,7 +399,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { ServerID: pbmsg.GetServerId(), ID: pbmsg.GetTaskId(), Type: pbmsg.GetTaskType(), - Payload: payload, + Payload: pbmsg.GetTaskPayload(), Queue: pbmsg.GetQueue(), Started: startTime, Deadline: deadline, @@ -472,7 +418,7 @@ type SchedulerEntry struct { Type string // Payload is the payload of the periodic task. - Payload map[string]interface{} + Payload []byte // Opts is the options for the periodic task. Opts []string @@ -490,10 +436,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { if entry == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } - payload, err := json.Marshal(entry.Payload) - if err != nil { - return nil, err - } next, err := ptypes.TimestampProto(entry.Next) if err != nil { return nil, err @@ -506,7 +448,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { Id: entry.ID, Spec: entry.Spec, TaskType: entry.Type, - TaskPayload: payload, + TaskPayload: entry.Payload, EnqueueOptions: entry.Opts, NextEnqueueTime: next, PrevEnqueueTime: prev, @@ -519,10 +461,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - payload, err := decodePayload(pbmsg.GetTaskPayload()) - if err != nil { - return nil, err - } next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) if err != nil { return nil, err @@ -535,7 +473,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { ID: pbmsg.GetId(), Spec: pbmsg.GetSpec(), Type: pbmsg.GetTaskType(), - Payload: payload, + Payload: pbmsg.GetTaskPayload(), Opts: pbmsg.GetEnqueueOptions(), Next: next, Prev: prev, diff --git a/internal/base/base_test.go b/internal/base/base_test.go index bfebef5..cd8baea 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -267,52 +267,68 @@ func TestSchedulerHistoryKey(t *testing.T) { } } +func toBytes(m map[string]interface{}) []byte { + b, err := json.Marshal(m) + if err != nil { + panic(err) + } + return b +} + func TestUniqueKey(t *testing.T) { tests := []struct { desc string qname string tasktype string - payload map[string]interface{} + payload []byte want string }{ { "with primitive types", "default", "email:send", - map[string]interface{}{"a": 123, "b": "hello", "c": true}, - "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}))), }, { "with unsorted keys", "default", "email:send", - map[string]interface{}{"b": "hello", "c": true, "a": 123}, - "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}))), }, { "with composite types", "default", "email:send", - map[string]interface{}{ + toBytes(map[string]interface{}{ "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, - "names": []string{"bob", "mike", "rob"}}, - "asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]", + "names": []string{"bob", "mike", "rob"}}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{ + "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, + "names": []string{"bob", "mike", "rob"}}))), }, { "with complex types", "default", "email:send", - map[string]interface{}{ + toBytes(map[string]interface{}{ "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), - "duration": time.Hour}, - "asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC", + "duration": time.Hour}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{ + "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), + "duration": time.Hour}))), }, { "with nil payload", "default", "reindex", nil, - "asynq:{default}:unique:reindex:nil", + "asynq:{default}:unique:reindex:", }, } @@ -333,7 +349,7 @@ func TestMessageEncoding(t *testing.T) { { in: &TaskMessage{ Type: "task1", - Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true}, + Payload: toBytes(map[string]interface{}{"a": 1, "b": "hello!", "c": true}), ID: id, Queue: "default", Retry: 10, @@ -343,7 +359,7 @@ func TestMessageEncoding(t *testing.T) { }, out: &TaskMessage{ Type: "task1", - Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}, + Payload: toBytes(map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}), ID: id, Queue: "default", Retry: 10, @@ -420,7 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) { ServerID: "abc123", ID: uuid.NewString(), Type: "taskA", - Payload: map[string]interface{}{"foo": "bar"}, + Payload: toBytes(map[string]interface{}{"foo": "bar"}), Queue: "default", Started: time.Now().Add(-3 * time.Hour), Deadline: time.Now().Add(30 * time.Second), @@ -455,7 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) { ID: uuid.NewString(), Spec: "* * * * *", Type: "task_A", - Payload: map[string]interface{}{"foo": "bar"}, + Payload: toBytes(map[string]interface{}{"foo": "bar"}), Opts: []string{"Queue('email')"}, Next: time.Now().Add(30 * time.Second).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(), diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 0999af2..1b50df1 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -50,9 +50,9 @@ func TestAllQueues(t *testing.T) { func TestCurrentStats(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) + m3 := h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/path/to/img"})) m4 := h.NewTaskMessage("sync", nil) m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -312,7 +312,7 @@ func TestListPending(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -3365,9 +3365,9 @@ func TestListWorkers(t *testing.T) { pid = 4567 serverID = "server123" - m1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"}) - m2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"}) - m3 = h.NewTaskMessage("reindex", map[string]interface{}{}) + m1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "abc123"})) + m2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/image/file"})) + m3 = h.NewTaskMessage("reindex", h.JSON(map[string]interface{}{})) ) tests := []struct { @@ -3450,7 +3450,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: map[string]interface{}{"fiz": "baz"}, + Payload: h.JSON(map[string]interface{}{"fiz": "baz"}), Opts: nil, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 23c32ea..b936360 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -61,8 +61,8 @@ func setup(tb testing.TB) (r *RDB) { func TestEnqueue(t *testing.T) { r := setup(t) defer r.Close() - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) - t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") + t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})) + t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv") t3 := h.NewTaskMessageWithQueue("sync", nil, "low") tests := []struct { @@ -101,9 +101,9 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: map[string]interface{}{"user_id": json.Number("123")}, + Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -157,7 +157,7 @@ func TestDequeue(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "hello!"}, + Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -355,7 +355,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "hello!"}, + Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -767,7 +767,7 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() - msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) tests := []struct { msg *base.TaskMessage processAt time.Time @@ -808,9 +808,9 @@ func TestScheduleUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: map[string]interface{}{"user_id": 123}, + Payload: h.JSON(map[string]interface{}{"user_id": 123}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -866,7 +866,7 @@ func TestRetry(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "Hola!"}, + Payload: h.JSON(map[string]interface{}{"subject": "Hola!"}), Retried: 10, Timeout: 1800, Queue: "default", @@ -874,7 +874,7 @@ func TestRetry(t *testing.T) { t2 := &base.TaskMessage{ ID: uuid.New(), Type: "gen_thumbnail", - Payload: map[string]interface{}{"path": "some/path/to/image.jpg"}, + Payload: h.JSON(map[string]interface{}{"path": "some/path/to/image.jpg"}), Timeout: 3000, Queue: "default", } @@ -1530,8 +1530,8 @@ func TestWriteServerStateWithWorkers(t *testing.T) { pid = 4242 serverID = "server123" - msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) - msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) + msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) @@ -1642,8 +1642,8 @@ func TestClearServerState(t *testing.T) { otherPID = 9876 otherServerID = "server987" - msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) - msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) + msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) diff --git a/payload.go b/payload.go deleted file mode 100644 index b447ef9..0000000 --- a/payload.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package asynq - -import ( - "encoding/json" - "fmt" - "time" - - "github.com/spf13/cast" -) - -// Payload holds arbitrary data needed for task execution. -type Payload struct { - data map[string]interface{} -} - -type errKeyNotFound struct { - key string -} - -func (e *errKeyNotFound) Error() string { - return fmt.Sprintf("key %q does not exist", e.key) -} - -// Has reports whether key exists. -func (p Payload) Has(key string) bool { - _, ok := p.data[key] - return ok -} - -func toInt(v interface{}) (int, error) { - switch v := v.(type) { - case json.Number: - val, err := v.Int64() - if err != nil { - return 0, err - } - return int(val), nil - default: - return cast.ToIntE(v) - } -} - -// String returns a string representation of payload data. -func (p Payload) String() string { - return fmt.Sprint(p.data) -} - -// MarshalJSON returns the JSON encoding of payload data. -func (p Payload) MarshalJSON() ([]byte, error) { - return json.Marshal(p.data) -} - -// GetString returns a string value if a string type is associated with -// the key, otherwise reports an error. -func (p Payload) GetString(key string) (string, error) { - v, ok := p.data[key] - if !ok { - return "", &errKeyNotFound{key} - } - return cast.ToStringE(v) -} - -// GetInt returns an int value if a numeric type is associated with -// the key, otherwise reports an error. -func (p Payload) GetInt(key string) (int, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - return toInt(v) -} - -// GetFloat64 returns a float64 value if a numeric type is associated with -// the key, otherwise reports an error. -func (p Payload) GetFloat64(key string) (float64, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - switch v := v.(type) { - case json.Number: - return v.Float64() - default: - return cast.ToFloat64E(v) - } -} - -// GetBool returns a boolean value if a boolean type is associated with -// the key, otherwise reports an error. -func (p Payload) GetBool(key string) (bool, error) { - v, ok := p.data[key] - if !ok { - return false, &errKeyNotFound{key} - } - return cast.ToBoolE(v) -} - -// GetStringSlice returns a slice of strings if a string slice type is associated with -// the key, otherwise reports an error. -func (p Payload) GetStringSlice(key string) ([]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringSliceE(v) -} - -// GetIntSlice returns a slice of ints if a int slice type is associated with -// the key, otherwise reports an error. -func (p Payload) GetIntSlice(key string) ([]int, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - switch v := v.(type) { - case []interface{}: - var res []int - for _, elem := range v { - val, err := toInt(elem) - if err != nil { - return nil, err - } - res = append(res, int(val)) - } - return res, nil - default: - return cast.ToIntSliceE(v) - } -} - -// GetStringMap returns a map of string to empty interface -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMap(key string) (map[string]interface{}, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapE(v) -} - -// GetStringMapString returns a map of string to string -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapString(key string) (map[string]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapStringE(v) -} - -// GetStringMapStringSlice returns a map of string to string slice -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapStringSlice(key string) (map[string][]string, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapStringSliceE(v) -} - -// GetStringMapInt returns a map of string to int -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapInt(key string) (map[string]int, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - switch v := v.(type) { - case map[string]interface{}: - res := make(map[string]int) - for key, val := range v { - ival, err := toInt(val) - if err != nil { - return nil, err - } - res[key] = ival - } - return res, nil - default: - return cast.ToStringMapIntE(v) - } -} - -// GetStringMapBool returns a map of string to boolean -// if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetStringMapBool(key string) (map[string]bool, error) { - v, ok := p.data[key] - if !ok { - return nil, &errKeyNotFound{key} - } - return cast.ToStringMapBoolE(v) -} - -// GetTime returns a time value if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetTime(key string) (time.Time, error) { - v, ok := p.data[key] - if !ok { - return time.Time{}, &errKeyNotFound{key} - } - return cast.ToTimeE(v) -} - -// GetDuration returns a duration value if a correct map type is associated with the key, -// otherwise reports an error. -func (p Payload) GetDuration(key string) (time.Duration, error) { - v, ok := p.data[key] - if !ok { - return 0, &errKeyNotFound{key} - } - switch v := v.(type) { - case json.Number: - val, err := v.Int64() - if err != nil { - return 0, err - } - return time.Duration(val), nil - default: - return cast.ToDurationE(v) - } -} diff --git a/payload_test.go b/payload_test.go deleted file mode 100644 index 379a4ac..0000000 --- a/payload_test.go +++ /dev/null @@ -1,675 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -package asynq - -import ( - "encoding/json" - "fmt" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - h "github.com/hibiken/asynq/internal/asynqtest" - "github.com/hibiken/asynq/internal/base" -) - -type payloadTest struct { - data map[string]interface{} - key string - nonkey string -} - -func TestPayloadString(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"name": "gopher"}, - key: "name", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetString(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetString(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetString(tc.nonkey) - if err == nil || got != "" { - t.Errorf("Payload.GetString(%q) = %v, %v; want '', error", - tc.key, got, err) - } - } -} - -func TestPayloadInt(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"user_id": 42}, - key: "user_id", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetInt(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetInt(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetInt(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error", - tc.key, got, err) - } - } -} - -func TestPayloadFloat64(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"pi": 3.14}, - key: "pi", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetFloat64(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetFloat64(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetFloat64(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetFloat64(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetFloat64(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetFloat64(%q) = %v, %v; want 0, error", - tc.key, got, err) - } - } -} - -func TestPayloadBool(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"enabled": true}, - key: "enabled", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetBool(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("Payload.GetBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetBool(tc.key) - if err != nil || got != tc.data[tc.key] { - t.Errorf("With Marshaling: Payload.GetBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetBool(tc.nonkey) - if err == nil || got != false { - t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringSlice(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"names": []string{"luke", "rey", "anakin"}}, - key: "names", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadIntSlice(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"nums": []int{9, 8, 7}}, - key: "nums", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetIntSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetIntSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetIntSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetIntSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMap(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"user": map[string]interface{}{"name": "Jon Doe", "score": 2.2}}, - key: "user", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMap(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMap(tc.key) - ignoreOpt := cmpopts.IgnoreMapEntries(func(key string, val interface{}) bool { - switch val.(type) { - case json.Number: - return true - default: - return false - } - }) - diff = cmp.Diff(got, tc.data[tc.key], ignoreOpt) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMap(%q) = %v, %v, want %v, nil;(-want,+got)\n%s", - tc.key, got, err, tc.data[tc.key], diff) - } - - // access non-existent key. - got, err = payload.GetStringMap(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapString(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"address": map[string]string{"line": "123 Main St", "city": "San Francisco", "state": "CA"}}, - key: "address", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapString(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapString(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapString(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapString(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapStringSlice(t *testing.T) { - favs := map[string][]string{ - "movies": {"forrest gump", "star wars"}, - "tv_shows": {"game of thrones", "HIMYM", "breaking bad"}, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"favorites": favs}, - key: "favorites", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapStringSlice(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapStringSlice(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapStringSlice(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapInt(t *testing.T) { - counter := map[string]int{ - "a": 1, - "b": 101, - "c": 42, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"counts": counter}, - key: "counts", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapInt(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapInt(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapInt(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapInt(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapInt(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadStringMapBool(t *testing.T) { - features := map[string]bool{ - "A": false, - "B": true, - "C": true, - } - tests := []payloadTest{ - { - data: map[string]interface{}{"features": features}, - key: "features", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetStringMapBool(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetStringMapBool(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetStringMapBool(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetStringMapBool(tc.nonkey) - if err == nil || got != nil { - t.Errorf("Payload.GetStringMapBool(%q) = %v, %v; want nil, error", - tc.key, got, err) - } - } -} - -func TestPayloadTime(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"current": time.Now()}, - key: "current", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetTime(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetTime(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetTime(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetTime(tc.nonkey) - if err == nil || !got.IsZero() { - t.Errorf("Payload.GetTime(%q) = %v, %v; want %v, error", - tc.key, got, err, time.Time{}) - } - } -} - -func TestPayloadDuration(t *testing.T) { - tests := []payloadTest{ - { - data: map[string]interface{}{"duration": 15 * time.Minute}, - key: "duration", - nonkey: "unknown", - }, - } - - for _, tc := range tests { - payload := Payload{tc.data} - - got, err := payload.GetDuration(tc.key) - diff := cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // encode and then decode task messsage. - in := h.NewTaskMessage("testing", tc.data) - encoded, err := base.EncodeMessage(in) - if err != nil { - t.Fatal(err) - } - out, err := base.DecodeMessage(encoded) - if err != nil { - t.Fatal(err) - } - payload = Payload{out.Payload} - got, err = payload.GetDuration(tc.key) - diff = cmp.Diff(got, tc.data[tc.key]) - if err != nil || diff != "" { - t.Errorf("With Marshaling: Payload.GetDuration(%q) = %v, %v, want %v, nil", - tc.key, got, err, tc.data[tc.key]) - } - - // access non-existent key. - got, err = payload.GetDuration(tc.nonkey) - if err == nil || got != 0 { - t.Errorf("Payload.GetDuration(%q) = %v, %v; want %v, error", - tc.key, got, err, time.Duration(0)) - } - } -} - -func TestPayloadHas(t *testing.T) { - payload := Payload{map[string]interface{}{ - "user_id": 123, - }} - - if !payload.Has("user_id") { - t.Errorf("Payload.Has(%q) = false, want true", "user_id") - } - if payload.Has("name") { - t.Errorf("Payload.Has(%q) = true, want false", "name") - } -} - -func TestPayloadDebuggingStrings(t *testing.T) { - data := map[string]interface{}{ - "foo": 123, - "bar": "hello", - "baz": false, - } - payload := Payload{data: data} - - if payload.String() != fmt.Sprint(data) { - t.Errorf("Payload.String() = %q, want %q", - payload.String(), fmt.Sprint(data)) - } - - got, err := payload.MarshalJSON() - if err != nil { - t.Fatal(err) - } - want, err := json.Marshal(data) - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(got, want); diff != "" { - t.Errorf("Payload.MarhsalJSON() = %s, want %s; (-want,+got)\n%s", - got, want, diff) - } -} diff --git a/processor_test.go b/processor_test.go index 7475889..6fd3f25 100644 --- a/processor_test.go +++ b/processor_test.go @@ -6,6 +6,7 @@ package asynq import ( "context" + "encoding/json" "fmt" "sort" "sync" @@ -13,7 +14,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" @@ -124,7 +124,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -216,7 +216,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -228,7 +228,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := h.NewTaskMessage("large_number", map[string]interface{}{"data": 111111111111111111}) + m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) t1 := NewTask(m1.Type, m1.Payload) tests := []struct { @@ -250,10 +250,14 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { handler := func(ctx context.Context, task *Task) error { mu.Lock() defer mu.Unlock() - if data, err := task.Payload.GetInt("data"); err != nil { - t.Errorf("coult not get data from payload: %v", err) - } else { + var payload map[string]int + if err := json.Unmarshal(task.Payload(), &payload); err != nil { + t.Errorf("coult not decode payload: %v", err) + } + if data, ok := payload["data"]; ok { t.Logf("data == %d", data) + } else { + t.Errorf("could not get data from payload") } processed = append(processed, task) return nil @@ -289,7 +293,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmpopts.IgnoreUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -592,7 +596,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { } p.terminate() - if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } @@ -611,7 +615,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return nil }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: false, }, { @@ -619,7 +623,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, { @@ -627,7 +631,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, } diff --git a/scheduler.go b/scheduler.go index 8daa507..21280d6 100644 --- a/scheduler.go +++ b/scheduler.go @@ -240,8 +240,8 @@ func (s *Scheduler) beat() { e := &base.SchedulerEntry{ ID: job.id.String(), Spec: job.cronspec, - Type: job.task.Type, - Payload: job.task.Payload.data, + Type: job.task.Type(), + Payload: job.task.Payload(), Opts: stringifyOptions(job.opts), Next: entry.Next, Prev: entry.Prev, diff --git a/servemux.go b/servemux.go index 4ccd6f6..6dc670d 100644 --- a/servemux.go +++ b/servemux.go @@ -62,7 +62,7 @@ func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) { mux.mu.RLock() defer mux.mu.RUnlock() - h, pattern = mux.match(t.Type) + h, pattern = mux.match(t.Type()) if h == nil { h, pattern = NotFoundHandler(), "" } @@ -151,7 +151,7 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) { // NotFound returns an error indicating that the handler was not found for the given task. func NotFound(ctx context.Context, task *Task) error { - return fmt.Errorf("handler not found for task %q", task.Type) + return fmt.Errorf("handler not found for task %q", task.Type()) } // NotFoundHandler returns a simple task handler that returns a ``not found`` error. diff --git a/servemux_test.go b/servemux_test.go index 98dd52f..227c4d7 100644 --- a/servemux_test.go +++ b/servemux_test.go @@ -68,7 +68,7 @@ func TestServeMux(t *testing.T) { } if called != tc.want { - t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) + t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want) } } } @@ -124,7 +124,7 @@ func TestServeMuxNotFound(t *testing.T) { task := NewTask(tc.typename, nil) err := mux.ProcessTask(context.Background(), task) if err == nil { - t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type) + t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) } } } @@ -164,7 +164,7 @@ func TestServeMuxMiddlewares(t *testing.T) { } if called != tc.want { - t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) + t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want) } } } diff --git a/server_test.go b/server_test.go index 0343ada..6d29dd6 100644 --- a/server_test.go +++ b/server_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/testbroker" "go.uber.org/goleak" @@ -39,12 +40,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 123}))) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), ProcessIn(1*time.Hour)) + _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -169,8 +170,8 @@ func TestServerWithFlakyBroker(t *testing.T) { h := func(ctx context.Context, task *Task) error { // force task retry. - if task.Type == "bad_task" { - return fmt.Errorf("could not process %q", task.Type) + if task.Type() == "bad_task" { + return fmt.Errorf("could not process %q", task.Type()) } time.Sleep(2 * time.Second) return nil