From b3b50d26a2618943847e3ba36413cc7c56098659 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 17 Mar 2021 09:07:43 -0700 Subject: [PATCH] Update test to match the new API --- asynq_test.go | 2 +- benchmark_test.go | 48 +++++++++++++---------- client_test.go | 67 ++++++++++++++++----------------- inspeq/inspector_test.go | 18 ++++----- internal/asynqtest/asynqtest.go | 10 +++++ internal/base/base_test.go | 48 +++++++++++++++-------- internal/rdb/inspect_test.go | 14 +++---- internal/rdb/rdb_test.go | 30 +++++++-------- processor_test.go | 28 ++++++++------ servemux.go | 2 +- servemux_test.go | 6 +-- server_test.go | 9 +++-- 12 files changed, 161 insertions(+), 121 deletions(-) diff --git a/asynq_test.go b/asynq_test.go index 2ca46d4..c763a05 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -85,7 +85,7 @@ func getRedisConnOpt(tb testing.TB) RedisConnOpt { var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { out := append([]*Task(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].Type < out[j].Type + return out[i].Type() < out[j].Type() }) return out }) diff --git a/benchmark_test.go b/benchmark_test.go index a4d7db2..9fc08dd 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -6,12 +6,24 @@ package asynq import ( "context" + "encoding/json" "fmt" "sync" "testing" "time" + + h "github.com/hibiken/asynq/internal/asynqtest" ) +// Creates a new task of type "task" with payload {"data": n}. +func makeTask(n int) *Task { + b, err := json.Marshal(map[string]int{"data": n}) + if err != nil { + panic(err) + } + return NewTask(fmt.Sprintf("task%d", n), b) +} + // Simple E2E Benchmark testing with no scheduled tasks and retries. func BenchmarkEndToEndSimple(b *testing.B) { const count = 100000 @@ -29,7 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + t := NewTask(fmt.Sprintf("task%d", i), h.KV(map[string]interface{}{"data": i})) if _, err := client.Enqueue(t); err != nil { b.Fatalf("could not enqueue a task: %v", err) } @@ -70,14 +82,12 @@ func BenchmarkEndToEnd(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { + if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -86,13 +96,18 @@ func BenchmarkEndToEnd(b *testing.B) { var wg sync.WaitGroup wg.Add(count * 2) handler := func(ctx context.Context, t *Task) error { - n, err := t.Payload.GetInt("data") - if err != nil { + var p map[string]int + if err := json.Unmarshal(t.Payload(), &p); err != nil { b.Logf("internal error: %v", err) } + n, ok := p["data"] + if !ok { + n = 1 + b.Logf("internal error: could not get data from payload") + } retried, ok := GetRetryCount(ctx) if !ok { - b.Logf("internal error: %v", err) + b.Logf("internal error: could not get retry count from context") } // Fail 1% of tasks for the first attempt. if retried == 0 && n%100 == 0 { @@ -136,20 +151,17 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < highCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, Queue("high")); err != nil { + if _, err := client.Enqueue(makeTask(i), Queue("high")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < defaultCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } for i := 0; i < lowCount; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, Queue("low")); err != nil { + if _, err := client.Enqueue(makeTask(i), Queue("low")); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -190,15 +202,13 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { }) // Enqueue 10,000 tasks. for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } // Schedule 10,000 tasks. for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) - if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil { + if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -213,7 +223,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.Log("Starting enqueueing") enqueued := 0 for enqueued < 100000 { - t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued}) + t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.KV(map[string]interface{}{"data": enqueued})) if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue diff --git a/client_test.go b/client_test.go index c3207ef..d15c8a7 100644 --- a/client_test.go +++ b/client_test.go @@ -20,7 +20,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) var ( now = time.Now() @@ -52,8 +52,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -85,8 +85,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { "default": { { Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -137,7 +137,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -163,8 +163,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 3, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -189,8 +189,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 0, // Retry count should be set to zero Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -216,8 +216,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: 10, // Last option takes precedence Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -242,8 +242,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "custom": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "custom", Timeout: int64(defaultTimeout.Seconds()), @@ -268,8 +268,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "high": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "high", Timeout: int64(defaultTimeout.Seconds()), @@ -294,8 +294,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: 20, @@ -320,8 +320,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(noTimeout.Seconds()), @@ -347,8 +347,8 @@ func TestClientEnqueue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: 20, @@ -390,7 +390,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -421,8 +421,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { "default": { { Message: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -448,8 +448,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": { { - Type: task.Type, - Payload: task.Payload.data, + Type: task.Type(), + Payload: task.Payload(), Retry: defaultMaxRetry, Queue: "default", Timeout: int64(defaultTimeout.Seconds()), @@ -501,7 +501,7 @@ func TestClientEnqueueError(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) + task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) tests := []struct { desc string @@ -613,7 +613,7 @@ func TestClientDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) defer c.Close() - c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) + c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) gotRes, err := c.Enqueue(tc.task, tc.opts...) if err != nil { t.Fatal(err) @@ -650,7 +650,7 @@ func TestClientEnqueueUnique(t *testing.T) { ttl time.Duration }{ { - NewTask("email", map[string]interface{}{"user_id": 123}), + NewTask("email", h.KV(map[string]interface{}{"user_id": 123})), time.Hour, }, } @@ -664,7 +664,7 @@ func TestClientEnqueueUnique(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) continue @@ -709,7 +709,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) @@ -755,7 +755,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { t.Fatal(err) } - gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() + gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) @@ -774,4 +774,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } } - diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 6120ddf..e50df2e 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -574,7 +574,7 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -632,7 +632,7 @@ func TestInspectorListActiveTasks(t *testing.T) { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -708,7 +708,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ScheduledTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -785,7 +785,7 @@ func TestInspectorListRetryTasks(t *testing.T) { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, RetryTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -861,7 +861,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ArchivedTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -922,7 +922,7 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) @@ -2598,7 +2598,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: map[string]interface{}{"fiz": "baz"}, + Payload: h.KV(map[string]interface{}{"fiz": "baz"}), Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), @@ -2614,7 +2614,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { }, { Spec: "@every 20m", - Task: asynq.NewTask("bar", map[string]interface{}{"fiz": "baz"}), + Task: asynq.NewTask("bar", h.KV(map[string]interface{}{"fiz": "baz"})), Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), @@ -2634,7 +2634,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { t.Errorf("SchedulerEntries() returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 2a54f60..aa3bb46 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -6,6 +6,7 @@ package asynqtest import ( + "encoding/json" "math" "sort" "testing" @@ -111,6 +112,15 @@ func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *bas } } +// KV serializes the given key-value pairs into stream of bytes. +func KV(kv map[string]interface{}) []byte { + b, err := json.Marshal(kv) + if err != nil { + panic(err) + } + return b +} + // TaskMessageAfterRetry returns an updated copy of t after retry. // It increments retry count and sets the error message. func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index bfebef5..cd8baea 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -267,52 +267,68 @@ func TestSchedulerHistoryKey(t *testing.T) { } } +func toBytes(m map[string]interface{}) []byte { + b, err := json.Marshal(m) + if err != nil { + panic(err) + } + return b +} + func TestUniqueKey(t *testing.T) { tests := []struct { desc string qname string tasktype string - payload map[string]interface{} + payload []byte want string }{ { "with primitive types", "default", "email:send", - map[string]interface{}{"a": 123, "b": "hello", "c": true}, - "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}))), }, { "with unsorted keys", "default", "email:send", - map[string]interface{}{"b": "hello", "c": true, "a": 123}, - "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}))), }, { "with composite types", "default", "email:send", - map[string]interface{}{ + toBytes(map[string]interface{}{ "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, - "names": []string{"bob", "mike", "rob"}}, - "asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]", + "names": []string{"bob", "mike", "rob"}}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{ + "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, + "names": []string{"bob", "mike", "rob"}}))), }, { "with complex types", "default", "email:send", - map[string]interface{}{ + toBytes(map[string]interface{}{ "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), - "duration": time.Hour}, - "asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC", + "duration": time.Hour}), + fmt.Sprintf("asynq:{default}:unique:email:send:%s", + string(toBytes(map[string]interface{}{ + "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), + "duration": time.Hour}))), }, { "with nil payload", "default", "reindex", nil, - "asynq:{default}:unique:reindex:nil", + "asynq:{default}:unique:reindex:", }, } @@ -333,7 +349,7 @@ func TestMessageEncoding(t *testing.T) { { in: &TaskMessage{ Type: "task1", - Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true}, + Payload: toBytes(map[string]interface{}{"a": 1, "b": "hello!", "c": true}), ID: id, Queue: "default", Retry: 10, @@ -343,7 +359,7 @@ func TestMessageEncoding(t *testing.T) { }, out: &TaskMessage{ Type: "task1", - Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}, + Payload: toBytes(map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}), ID: id, Queue: "default", Retry: 10, @@ -420,7 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) { ServerID: "abc123", ID: uuid.NewString(), Type: "taskA", - Payload: map[string]interface{}{"foo": "bar"}, + Payload: toBytes(map[string]interface{}{"foo": "bar"}), Queue: "default", Started: time.Now().Add(-3 * time.Hour), Deadline: time.Now().Add(30 * time.Second), @@ -455,7 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) { ID: uuid.NewString(), Spec: "* * * * *", Type: "task_A", - Payload: map[string]interface{}{"foo": "bar"}, + Payload: toBytes(map[string]interface{}{"foo": "bar"}), Opts: []string{"Queue('email')"}, Next: time.Now().Add(30 * time.Second).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(), diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index bc2c806..3c38ab5 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -50,9 +50,9 @@ func TestAllQueues(t *testing.T) { func TestCurrentStats(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) + m3 := h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/path/to/img"})) m4 := h.NewTaskMessage("sync", nil) m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -312,7 +312,7 @@ func TestListPending(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -3282,9 +3282,9 @@ func TestListWorkers(t *testing.T) { pid = 4567 serverID = "server123" - m1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"}) - m2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"}) - m3 = h.NewTaskMessage("reindex", map[string]interface{}{}) + m1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "abc123"})) + m2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/image/file"})) + m3 = h.NewTaskMessage("reindex", h.KV(map[string]interface{}{})) ) tests := []struct { @@ -3367,7 +3367,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: map[string]interface{}{"fiz": "baz"}, + Payload: h.KV(map[string]interface{}{"fiz": "baz"}), Opts: nil, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 23c32ea..3bf5d95 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -61,8 +61,8 @@ func setup(tb testing.TB) (r *RDB) { func TestEnqueue(t *testing.T) { r := setup(t) defer r.Close() - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) - t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") + t1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})) + t2 := h.NewTaskMessageWithQueue("generate_csv", h.KV(map[string]interface{}{}), "csv") t3 := h.NewTaskMessageWithQueue("sync", nil, "low") tests := []struct { @@ -101,9 +101,9 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: map[string]interface{}{"user_id": json.Number("123")}, + Payload: h.KV(map[string]interface{}{"user_id": json.Number("123")}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -157,7 +157,7 @@ func TestDequeue(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "hello!"}, + Payload: h.KV(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -355,7 +355,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "hello!"}, + Payload: h.KV(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -767,7 +767,7 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() - msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + msg := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) tests := []struct { msg *base.TaskMessage processAt time.Time @@ -808,9 +808,9 @@ func TestScheduleUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: map[string]interface{}{"user_id": 123}, + Payload: h.KV(map[string]interface{}{"user_id": 123}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -866,7 +866,7 @@ func TestRetry(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: map[string]interface{}{"subject": "Hola!"}, + Payload: h.KV(map[string]interface{}{"subject": "Hola!"}), Retried: 10, Timeout: 1800, Queue: "default", @@ -874,7 +874,7 @@ func TestRetry(t *testing.T) { t2 := &base.TaskMessage{ ID: uuid.New(), Type: "gen_thumbnail", - Payload: map[string]interface{}{"path": "some/path/to/image.jpg"}, + Payload: h.KV(map[string]interface{}{"path": "some/path/to/image.jpg"}), Timeout: 3000, Queue: "default", } @@ -1530,8 +1530,8 @@ func TestWriteServerStateWithWorkers(t *testing.T) { pid = 4242 serverID = "server123" - msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) - msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) + msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) @@ -1642,8 +1642,8 @@ func TestClearServerState(t *testing.T) { otherPID = 9876 otherServerID = "server987" - msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) - msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) + msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) diff --git a/processor_test.go b/processor_test.go index 7475889..95f73e2 100644 --- a/processor_test.go +++ b/processor_test.go @@ -6,6 +6,7 @@ package asynq import ( "context" + "encoding/json" "fmt" "sort" "sync" @@ -13,7 +14,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" @@ -124,7 +124,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -216,7 +216,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -228,7 +228,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := h.NewTaskMessage("large_number", map[string]interface{}{"data": 111111111111111111}) + m1 := h.NewTaskMessage("large_number", h.KV(map[string]interface{}{"data": 111111111111111111})) t1 := NewTask(m1.Type, m1.Payload) tests := []struct { @@ -250,10 +250,14 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { handler := func(ctx context.Context, task *Task) error { mu.Lock() defer mu.Unlock() - if data, err := task.Payload.GetInt("data"); err != nil { - t.Errorf("coult not get data from payload: %v", err) - } else { + var payload map[string]int + if err := json.Unmarshal(task.Payload(), &payload); err != nil { + t.Errorf("coult not decode payload: %v", err) + } + if data, ok := payload["data"]; ok { t.Logf("data == %d", data) + } else { + t.Errorf("could not get data from payload") } processed = append(processed, task) return nil @@ -289,7 +293,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p.terminate() mu.Lock() - if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmpopts.IgnoreUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } mu.Unlock() @@ -592,7 +596,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { } p.terminate() - if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } @@ -611,7 +615,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return nil }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), wantErr: false, }, { @@ -619,7 +623,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, { @@ -627,7 +631,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, - task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), + task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, } diff --git a/servemux.go b/servemux.go index 90e9972..6dc670d 100644 --- a/servemux.go +++ b/servemux.go @@ -151,7 +151,7 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) { // NotFound returns an error indicating that the handler was not found for the given task. func NotFound(ctx context.Context, task *Task) error { - return fmt.Errorf("handler not found for task %q", task.Type) + return fmt.Errorf("handler not found for task %q", task.Type()) } // NotFoundHandler returns a simple task handler that returns a ``not found`` error. diff --git a/servemux_test.go b/servemux_test.go index 98dd52f..227c4d7 100644 --- a/servemux_test.go +++ b/servemux_test.go @@ -68,7 +68,7 @@ func TestServeMux(t *testing.T) { } if called != tc.want { - t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) + t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want) } } } @@ -124,7 +124,7 @@ func TestServeMuxNotFound(t *testing.T) { task := NewTask(tc.typename, nil) err := mux.ProcessTask(context.Background(), task) if err == nil { - t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type) + t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) } } } @@ -164,7 +164,7 @@ func TestServeMuxMiddlewares(t *testing.T) { } if called != tc.want { - t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) + t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want) } } } diff --git a/server_test.go b/server_test.go index 0343ada..05a58e5 100644 --- a/server_test.go +++ b/server_test.go @@ -11,6 +11,7 @@ import ( "testing" "time" + "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/testbroker" "go.uber.org/goleak" @@ -39,12 +40,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) + _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 123}))) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), ProcessIn(1*time.Hour)) + _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -169,8 +170,8 @@ func TestServerWithFlakyBroker(t *testing.T) { h := func(ctx context.Context, task *Task) error { // force task retry. - if task.Type == "bad_task" { - return fmt.Errorf("could not process %q", task.Type) + if task.Type() == "bad_task" { + return fmt.Errorf("could not process %q", task.Type()) } time.Sleep(2 * time.Second) return nil