From 24e3d2e273f4fdb710cb7af80e8aab7586941ff2 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 23 Mar 2021 07:19:09 -0700 Subject: [PATCH] Change Client.Enqueue to return taskID --- CHANGELOG.md | 1 + README.md | 12 ++-- client.go | 58 +++--------------- client_test.go | 161 +++---------------------------------------------- doc.go | 4 +- go.sum | 20 ++++++ scheduler.go | 8 +-- 7 files changed, 48 insertions(+), 216 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9691b..442f9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Task `Type` and `Payload` should be accessed by a method call. - `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`. - `Scheduler` API has changed. Renamed `Stop` to `Shutdown`. +- `Client.Enqueue` returns task ID string instead of `Result` struct. - Requires redis v4.0+ for multiple field/value pair support - Renamed pending key (TODO: need migration script) diff --git a/README.md b/README.md index 8a653b8..805d1e8 100644 --- a/README.md +++ b/README.md @@ -177,11 +177,11 @@ func main() { if err != nil { log.Fatalf("could not create task: %v", err) } - res, err := c.Enqueue(t) + taskID, err := c.Enqueue(t) if err != nil { log.Fatalf("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + log.Printf("Enqueued task: %s", taskID) // ------------------------------------------------------------ @@ -189,11 +189,11 @@ func main() { // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ - res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) + taskID, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) if err != nil { log.Fatalf("could not schedule task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + log.Printf("Enqueued task: %s", taskID) // ---------------------------------------------------------------------------- @@ -207,11 +207,11 @@ func main() { if err != nil { log.Fatalf("could not create task: %v", err) } - res, err = c.Enqueue(t) + taskID, err = c.Enqueue(t) if err != nil { log.Fatalf("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + log.Printf("Enqueued task: %s", taskID) // --------------------------------------------------------------------------- // Example 4: Pass options to tune task processing behavior at enqueue time. diff --git a/client.go b/client.go index dc1841c..52938b0 100644 --- a/client.go +++ b/client.go @@ -254,41 +254,6 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { c.opts[taskType] = opts } -// A Result holds enqueued task's metadata. -type Result struct { - // ID is a unique identifier for the task. - ID string - - // EnqueuedAt is the time the task was enqueued in UTC. - EnqueuedAt time.Time - - // ProcessAt indicates when the task should be processed. - ProcessAt time.Time - - // Retry is the maximum number of retry for the task. - Retry int - - // Queue is a name of the queue the task is enqueued to. - Queue string - - // Timeout is the timeout value for the task. - // Counting for timeout starts when a worker starts processing the task. - // If task processing doesn't complete within the timeout, the task will be retried. - // The value zero means no timeout. - // - // If deadline is set, min(now+timeout, deadline) is used, where the now is the time when - // a worker starts processing the task. - Timeout time.Duration - - // Deadline is the deadline value for the task. - // If task processing doesn't complete before the deadline, the task will be retried. - // The value time.Unix(0, 0) means no deadline. - // - // If timeout is set, min(now+timeout, deadline) is used, where the now is the time when - // a worker starts processing the task. - Deadline time.Time -} - // Close closes the connection with redis. func (c *Client) Close() error { return c.rdb.Close() @@ -296,13 +261,14 @@ func (c *Client) Close() error { // Enqueue enqueues the given task to be processed asynchronously. // -// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. +// Enqueue returns a task ID if the task is enqueued successfully, otherwise returns a non-nil error. // // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. -// By deafult, max retry is set to 25 and timeout is set to 30 minutes. +// +// By deafult, task is enqueued to the "default" queue, max retry is set to 25 and timeout is set to 30 minutes. // If no ProcessAt or ProcessIn options are passed, the task will be processed immediately. -func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { +func (c *Client) Enqueue(task *Task, opts ...Option) (taskID string, err error) { c.mu.Lock() if defaults, ok := c.opts[task.Type()]; ok { opts = append(defaults, opts...) @@ -310,7 +276,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { c.mu.Unlock() opt, err := composeOptions(opts...) if err != nil { - return nil, err + return "", err } deadline := noDeadline if !opt.deadline.IsZero() { @@ -347,19 +313,11 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { } switch { case err == rdb.ErrDuplicateTask: - return nil, fmt.Errorf("%w", ErrDuplicateTask) + return "", fmt.Errorf("%w", ErrDuplicateTask) case err != nil: - return nil, err + return "", err } - return &Result{ - ID: msg.ID.String(), - EnqueuedAt: time.Now().UTC(), - ProcessAt: opt.processAt, - Queue: msg.Queue, - Retry: msg.Retry, - Timeout: timeout, - Deadline: deadline, - }, nil + return msg.ID.String(), nil } func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { diff --git a/client_test.go b/client_test.go index cc1dacc..678c002 100644 --- a/client_test.go +++ b/client_test.go @@ -32,7 +32,6 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task *Task processAt time.Time // value for ProcessAt option opts []Option // other options - wantRes *Result wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ @@ -41,14 +40,6 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task: task, processAt: now, opts: []Option{}, - wantRes: &Result{ - EnqueuedAt: now.UTC(), - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -70,14 +61,6 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task: task, processAt: oneHourLater, opts: []Option{}, - wantRes: &Result{ - EnqueuedAt: now.UTC(), - ProcessAt: oneHourLater, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": {}, }, @@ -103,19 +86,11 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. opts := append(tc.opts, ProcessAt(tc.processAt)) - gotRes, err := client.Enqueue(tc.task, opts...) + _, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } - cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID"), - cmpopts.EquateApproxTime(500 * time.Millisecond), - } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s", - tc.desc, tc.processAt, gotRes, tc.wantRes, diff) - } for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r, qname) @@ -138,13 +113,11 @@ func TestClientEnqueue(t *testing.T) { defer client.Close() task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) - now := time.Now() tests := []struct { desc string task *Task opts []Option - wantRes *Result wantPending map[string][]*base.TaskMessage }{ { @@ -153,13 +126,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(3), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 3, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -179,13 +145,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(-2), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 0, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -206,13 +165,6 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(2), MaxRetry(10), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 10, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -232,13 +184,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("custom"), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "custom", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "custom": { { @@ -258,13 +203,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("HIGH"), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "high", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "high": { { @@ -284,13 +222,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Timeout(20 * time.Second), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -310,13 +241,6 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: noTimeout, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -337,13 +261,6 @@ func TestClientEnqueue(t *testing.T) { Timeout(20 * time.Second), Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -362,19 +279,11 @@ func TestClientEnqueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - gotRes, err := client.Enqueue(tc.task, tc.opts...) + _, err := client.Enqueue(tc.task, tc.opts...) if err != nil { t.Error(err) continue } - cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), - cmpopts.EquateApproxTime(500 * time.Millisecond), - } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) - } for qname, want := range tc.wantPending { got := h.GetPendingMessages(t, r, qname) @@ -391,14 +300,12 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { defer client.Close() task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) - now := time.Now() tests := []struct { desc string task *Task delay time.Duration // value for ProcessIn option opts []Option // other options - wantRes *Result wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ @@ -407,13 +314,6 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { task: task, delay: 1 * time.Hour, opts: []Option{}, - wantRes: &Result{ - ProcessAt: now.Add(1 * time.Hour), - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": {}, }, @@ -438,13 +338,6 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { task: task, delay: 0, opts: []Option{}, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, wantPending: map[string][]*base.TaskMessage{ "default": { { @@ -467,19 +360,11 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. opts := append(tc.opts, ProcessIn(tc.delay)) - gotRes, err := client.Enqueue(tc.task, opts...) + _, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } - cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), - cmpopts.EquateApproxTime(500 * time.Millisecond), - } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s", - tc.desc, tc.delay, gotRes, tc.wantRes, diff) - } for qname, want := range tc.wantPending { gotPending := h.GetPendingMessages(t, r, qname) @@ -530,14 +415,11 @@ func TestClientEnqueueError(t *testing.T) { func TestClientDefaultOptions(t *testing.T) { r := setup(t) - now := time.Now() - tests := []struct { desc string defaultOpts []Option // options set at the client level. opts []Option // options used at enqueue time. task *Task - wantRes *Result queue string // queue that the message should go into. want *base.TaskMessage }{ @@ -546,14 +428,7 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed")}, opts: []Option{}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "feed", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, - queue: "feed", + queue: "feed", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -568,14 +443,7 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "feed", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, - queue: "feed", + queue: "feed", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -590,14 +458,7 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{Queue("critical")}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "critical", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, - }, - queue: "critical", + queue: "critical", want: &base.TaskMessage{ Type: "feed:import", Payload: nil, @@ -614,18 +475,10 @@ func TestClientDefaultOptions(t *testing.T) { c := NewClient(getRedisConnOpt(t)) defer c.Close() c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) - gotRes, err := c.Enqueue(tc.task, tc.opts...) + _, err := c.Enqueue(tc.task, tc.opts...) if err != nil { t.Fatal(err) } - cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), - cmpopts.EquateApproxTime(500 * time.Millisecond), - } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) - } pending := h.GetPendingMessages(t, r, tc.queue) if len(pending) != 1 { t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.", diff --git a/doc.go b/doc.go index 06ede25..c6d580c 100644 --- a/doc.go +++ b/doc.go @@ -29,10 +29,10 @@ The Client is used to enqueue a task. task := asynq.NewTask("example", b) // Enqueue the task to be processed immediately. - res, err := client.Enqueue(task) + taskID, err := client.Enqueue(task) // Schedule the task to be processed after one minute. - res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) + taskID, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) The Server is used to run the task processing workers with a given handler. diff --git a/go.sum b/go.sum index dad6de7..23f0123 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,16 @@ +cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -12,7 +18,9 @@ github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6d github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= @@ -39,6 +47,7 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -49,6 +58,7 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= @@ -58,10 +68,13 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -71,9 +84,11 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= @@ -92,16 +107,20 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -125,4 +144,5 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/scheduler.go b/scheduler.go index b9bc3d4..68a047b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -117,7 +117,7 @@ type enqueueJob struct { } func (j *enqueueJob) Run() { - res, err := j.client.Enqueue(j.task, j.opts...) + taskID, err := j.client.Enqueue(j.task, j.opts...) if err != nil { j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err) if j.errHandler != nil { @@ -125,10 +125,10 @@ func (j *enqueueJob) Run() { } return } - j.logger.Debugf("scheduler enqueued a task: %+v", res) + j.logger.Debugf("scheduler enqueued a task: %s", taskID) event := &base.SchedulerEnqueueEvent{ - TaskID: res.ID, - EnqueuedAt: res.EnqueuedAt.In(j.location), + TaskID: taskID, + EnqueuedAt: time.Now().In(j.location), } err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) if err != nil {