From 6171a6400de711f3cd847c9814ebd67aac8a38a5 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 20 Mar 2021 06:54:00 -0700 Subject: [PATCH] Fix build --- CHANGELOG.md | 1 + benchmark_test.go | 5 ++--- client_test.go | 10 +++++----- inspeq/inspector_test.go | 4 ++-- internal/asynqtest/asynqtest.go | 4 ++-- internal/rdb/inspect.go | 7 ------- internal/rdb/inspect_test.go | 14 +++++++------- internal/rdb/rdb_test.go | 30 +++++++++++++++--------------- processor_test.go | 8 ++++---- server_test.go | 4 ++-- 10 files changed, 40 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ecf4474..bc7a71c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - NewTask function now takes array of bytes as payload. +- Task `Type` and `Payload` should be accessed by a method call. - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script) diff --git a/benchmark_test.go b/benchmark_test.go index 9fc08dd..b98ea34 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -41,8 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { }) // Create a bunch of tasks for i := 0; i < count; i++ { - t := NewTask(fmt.Sprintf("task%d", i), h.KV(map[string]interface{}{"data": i})) - if _, err := client.Enqueue(t); err != nil { + if _, err := client.Enqueue(makeTask(i)); err != nil { b.Fatalf("could not enqueue a task: %v", err) } } @@ -223,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.Log("Starting enqueueing") enqueued := 0 for enqueued < 100000 { - t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.KV(map[string]interface{}{"data": enqueued})) + t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue diff --git a/client_test.go b/client_test.go index d15c8a7..cc1dacc 100644 --- a/client_test.go +++ b/client_test.go @@ -20,7 +20,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) var ( now = time.Now() @@ -137,7 +137,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -390,7 +390,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -501,7 +501,7 @@ func TestClientEnqueueError(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) tests := []struct { desc string @@ -650,7 +650,7 @@ func TestClientEnqueueUnique(t *testing.T) { ttl time.Duration }{ { - NewTask("email", h.KV(map[string]interface{}{"user_id": 123})), + NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), time.Hour, }, } diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index e50df2e..0fedc9f 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -2598,7 +2598,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: h.KV(map[string]interface{}{"fiz": "baz"}), + Payload: h.JSON(map[string]interface{}{"fiz": "baz"}), Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), @@ -2614,7 +2614,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { }, { Spec: "@every 20m", - Task: asynq.NewTask("bar", h.KV(map[string]interface{}{"fiz": "baz"})), + Task: asynq.NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index aa3bb46..66831ba 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -112,8 +112,8 @@ func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *bas } } -// KV serializes the given key-value pairs into stream of bytes. -func KV(kv map[string]interface{}) []byte { +// JSON serializes the given key-value pairs into stream of bytes in JSON. +func JSON(kv map[string]interface{}) []byte { b, err := json.Marshal(kv) if err != nil { panic(err) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 25e8d78..b211858 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -765,17 +765,10 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error { n, ok := res.(int64) if !ok { return fmt.Errorf("command error: unexpected return value %v", res) -<<<<<<< HEAD } if n == 0 { return ErrTaskNotFound } -======= - } - if n == 0 { - return ErrTaskNotFound - } ->>>>>>> e0402fe... Refactor redis keys and store messages in protobuf return nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 3c38ab5..42cddfb 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -50,9 +50,9 @@ func TestAllQueues(t *testing.T) { func TestCurrentStats(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) + m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) - m3 := h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/path/to/img"})) + m3 := h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/path/to/img"})) m4 := h.NewTaskMessage("sync", nil) m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -312,7 +312,7 @@ func TestListPending(t *testing.T) { r := setup(t) defer r.Close() - m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) + m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") @@ -3282,9 +3282,9 @@ func TestListWorkers(t *testing.T) { pid = 4567 serverID = "server123" - m1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "abc123"})) - m2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/image/file"})) - m3 = h.NewTaskMessage("reindex", h.KV(map[string]interface{}{})) + m1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "abc123"})) + m2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/image/file"})) + m3 = h.NewTaskMessage("reindex", h.JSON(map[string]interface{}{})) ) tests := []struct { @@ -3367,7 +3367,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) { { Spec: "@every 20m", Type: "bar", - Payload: h.KV(map[string]interface{}{"fiz": "baz"}), + Payload: h.JSON(map[string]interface{}{"fiz": "baz"}), Opts: nil, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3bf5d95..b936360 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -61,8 +61,8 @@ func setup(tb testing.TB) (r *RDB) { func TestEnqueue(t *testing.T) { r := setup(t) defer r.Close() - t1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})) - t2 := h.NewTaskMessageWithQueue("generate_csv", h.KV(map[string]interface{}{}), "csv") + t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})) + t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv") t3 := h.NewTaskMessageWithQueue("sync", nil, "low") tests := []struct { @@ -101,9 +101,9 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: h.KV(map[string]interface{}{"user_id": json.Number("123")}), + Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -157,7 +157,7 @@ func TestDequeue(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: h.KV(map[string]interface{}{"subject": "hello!"}), + Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -355,7 +355,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: h.KV(map[string]interface{}{"subject": "hello!"}), + Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", Timeout: 1800, Deadline: 0, @@ -767,7 +767,7 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() - msg := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) + msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"})) tests := []struct { msg *base.TaskMessage processAt time.Time @@ -808,9 +808,9 @@ func TestScheduleUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: h.KV(map[string]interface{}{"user_id": 123}), + Payload: h.JSON(map[string]interface{}{"user_id": 123}), Queue: base.DefaultQueueName, - UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), + UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})), } tests := []struct { @@ -866,7 +866,7 @@ func TestRetry(t *testing.T) { t1 := &base.TaskMessage{ ID: uuid.New(), Type: "send_email", - Payload: h.KV(map[string]interface{}{"subject": "Hola!"}), + Payload: h.JSON(map[string]interface{}{"subject": "Hola!"}), Retried: 10, Timeout: 1800, Queue: "default", @@ -874,7 +874,7 @@ func TestRetry(t *testing.T) { t2 := &base.TaskMessage{ ID: uuid.New(), Type: "gen_thumbnail", - Payload: h.KV(map[string]interface{}{"path": "some/path/to/image.jpg"}), + Payload: h.JSON(map[string]interface{}{"path": "some/path/to/image.jpg"}), Timeout: 3000, Queue: "default", } @@ -1530,8 +1530,8 @@ func TestWriteServerStateWithWorkers(t *testing.T) { pid = 4242 serverID = "server123" - msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) - msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) + msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) @@ -1642,8 +1642,8 @@ func TestClearServerState(t *testing.T) { otherPID = 9876 otherServerID = "server987" - msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) - msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) + msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"})) + msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"})) ttl = 5 * time.Second ) diff --git a/processor_test.go b/processor_test.go index 95f73e2..6fd3f25 100644 --- a/processor_test.go +++ b/processor_test.go @@ -228,7 +228,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := h.NewTaskMessage("large_number", h.KV(map[string]interface{}{"data": 111111111111111111})) + m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) t1 := NewTask(m1.Type, m1.Payload) tests := []struct { @@ -615,7 +615,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return nil }, - task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: false, }, { @@ -623,7 +623,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, - task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, { @@ -631,7 +631,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, - task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, } diff --git a/server_test.go b/server_test.go index 05a58e5..6d29dd6 100644 --- a/server_test.go +++ b/server_test.go @@ -40,12 +40,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 123}))) + _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 123}))) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) + _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) }