From b835090ad8b0cc3fbe8fd3933dcfcac1b5030dfd Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 15 May 2021 06:43:18 -0700 Subject: [PATCH] Update Client.Enqueue to return TaskInfo --- CHANGELOG.md | 2 + README.md | 16 +-- client.go | 55 ++------- client_test.go | 300 +++++++++++++++++++++++++++++++------------------ doc.go | 4 +- scheduler.go | 8 +- 6 files changed, 212 insertions(+), 173 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d3617a..412bedd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Server` API has changed. Renamed `Quiet` to `Stop`. Renamed `Stop` to `Shutdown`. _Note:_ As a result of this renaming, the behavior of `Stop` has changed. Please update the exising code to call `Shutdown` where it used to call `Stop`. - `Scheduler` API has changed. Renamed `Stop` to `Shutdown`. - Requires redis v4.0+ for multiple field/value pair support +- Renamed pending key (TODO: need migration script) +- `Client.Enqueue` now returns `TaskInfo` - Renamed pending key (TODO: need migration script ## [0.17.2] - 2021-06-06 diff --git a/README.md b/README.md index 1f7c064..e2ecfda 100644 --- a/README.md +++ b/README.md @@ -177,11 +177,11 @@ func main() { if err != nil { log.Fatalf("could not create task: %v", err) } - res, err := c.Enqueue(t) + info, err := c.Enqueue(t) if err != nil { log.Fatalf("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue()) // ------------------------------------------------------------ @@ -189,11 +189,11 @@ func main() { // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ - res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) + info, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) if err != nil { log.Fatalf("could not schedule task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue()) // ---------------------------------------------------------------------------- @@ -207,22 +207,22 @@ func main() { if err != nil { log.Fatalf("could not create task: %v", err) } - res, err = c.Enqueue(t) + info, err = c.Enqueue(t) if err != nil { log.Fatalf("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue()) // --------------------------------------------------------------------------- // Example 4: Pass options to tune task processing behavior at enqueue time. // Options passed at enqueue time override default ones. // --------------------------------------------------------------------------- - res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) + info, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + fmt.Printf("enqueued task: id=%s queue=%s\n", info.ID(), info.Queue()) } ``` diff --git a/client.go b/client.go index 81ec5a5..10720bc 100644 --- a/client.go +++ b/client.go @@ -254,41 +254,6 @@ func (c *Client) SetDefaultOptions(taskType string, opts ...Option) { c.opts[taskType] = opts } -// A Result holds enqueued task's metadata. -type Result struct { - // ID is a unique identifier for the task. - ID string - - // EnqueuedAt is the time the task was enqueued in UTC. - EnqueuedAt time.Time - - // ProcessAt indicates when the task should be processed. - ProcessAt time.Time - - // Retry is the maximum number of retry for the task. - Retry int - - // Queue is a name of the queue the task is enqueued to. - Queue string - - // Timeout is the timeout value for the task. - // Counting for timeout starts when a worker starts processing the task. - // If task processing doesn't complete within the timeout, the task will be retried. - // The value zero means no timeout. - // - // If deadline is set, min(now+timeout, deadline) is used, where the now is the time when - // a worker starts processing the task. - Timeout time.Duration - - // Deadline is the deadline value for the task. - // If task processing doesn't complete before the deadline, the task will be retried. - // The value time.Unix(0, 0) means no deadline. - // - // If timeout is set, min(now+timeout, deadline) is used, where the now is the time when - // a worker starts processing the task. - Deadline time.Time -} - // Close closes the connection with redis. func (c *Client) Close() error { return c.rdb.Close() @@ -296,13 +261,14 @@ func (c *Client) Close() error { // Enqueue enqueues the given task to be processed asynchronously. // -// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error. +// Enqueue returns TaskInfo and nil error if the task is enqueued successfully, otherwise returns a non-nil error. // // The argument opts specifies the behavior of task processing. // If there are conflicting Option values the last one overrides others. // By deafult, max retry is set to 25 and timeout is set to 30 minutes. -// If no ProcessAt or ProcessIn options are passed, the task will be processed immediately. -func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { +// +// If no ProcessAt or ProcessIn options are provided, the task will be pending immediately. +func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { c.mu.Lock() if defaults, ok := c.opts[task.Type()]; ok { opts = append(defaults, opts...) @@ -339,11 +305,14 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { UniqueKey: uniqueKey, } now := time.Now() + var state base.TaskState if opt.processAt.Before(now) || opt.processAt.Equal(now) { opt.processAt = now err = c.enqueue(msg, opt.uniqueTTL) + state = base.TaskStatePending } else { err = c.schedule(msg, opt.processAt, opt.uniqueTTL) + state = base.TaskStateScheduled } switch { case errors.Is(err, errors.ErrDuplicateTask): @@ -351,15 +320,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { case err != nil: return nil, err } - return &Result{ - ID: msg.ID.String(), - EnqueuedAt: time.Now().UTC(), - ProcessAt: opt.processAt, - Queue: msg.Queue, - Retry: msg.Retry, - Timeout: timeout, - Deadline: deadline, - }, nil + return &TaskInfo{msg, state, opt.processAt}, nil } func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { diff --git a/client_test.go b/client_test.go index cc1dacc..a186ade 100644 --- a/client_test.go +++ b/client_test.go @@ -32,7 +32,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task *Task processAt time.Time // value for ProcessAt option opts []Option // other options - wantRes *Result + wantInfo *TaskInfo wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ @@ -41,13 +41,16 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task: task, processAt: now, opts: []Option{}, - wantRes: &Result{ - EnqueuedAt: now.UTC(), - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Queue: "default", + Retry: defaultMaxRetry, + Timeout: int64(defaultTimeout.Seconds()), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -70,13 +73,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { task: task, processAt: oneHourLater, opts: []Option{}, - wantRes: &Result{ - EnqueuedAt: now.UTC(), - ProcessAt: oneHourLater, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStateScheduled, + nextProcessAt: oneHourLater, }, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -103,18 +110,19 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. opts := append(tc.opts, ProcessAt(tc.processAt)) - gotRes, err := client.Enqueue(tc.task, opts...) + gotInfo, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID"), + cmp.AllowUnexported(TaskInfo{}), + cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s", - tc.desc, tc.processAt, gotRes, tc.wantRes, diff) + tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff) } for qname, want := range tc.wantPending { @@ -144,7 +152,7 @@ func TestClientEnqueue(t *testing.T) { desc string task *Task opts []Option - wantRes *Result + wantInfo *TaskInfo wantPending map[string][]*base.TaskMessage }{ { @@ -153,12 +161,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(3), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 3, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: 3, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -179,12 +192,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ MaxRetry(-2), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 0, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: 0, // Retry count should be set to zero + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -206,12 +224,17 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(2), MaxRetry(10), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: 10, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: 10, // Last option takes precedence + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -232,12 +255,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("custom"), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "custom", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "custom", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "custom": { @@ -258,12 +286,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Queue("HIGH"), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "high", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "high", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "high": { @@ -284,12 +317,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Timeout(20 * time.Second), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: 20, + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -310,12 +348,17 @@ func TestClientEnqueue(t *testing.T) { opts: []Option{ Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: noTimeout, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(noTimeout.Seconds()), + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -337,12 +380,17 @@ func TestClientEnqueue(t *testing.T) { Timeout(20 * time.Second), Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: 20 * time.Second, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: 20, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -362,18 +410,19 @@ func TestClientEnqueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) // clean up db before each test case. - gotRes, err := client.Enqueue(tc.task, tc.opts...) + gotInfo, err := client.Enqueue(tc.task, tc.opts...) if err != nil { t.Error(err) continue } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), + cmp.AllowUnexported(TaskInfo{}), + cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) + tc.desc, gotInfo, tc.wantInfo, diff) } for qname, want := range tc.wantPending { @@ -398,7 +447,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { task *Task delay time.Duration // value for ProcessIn option opts []Option // other options - wantRes *Result + wantInfo *TaskInfo wantPending map[string][]*base.TaskMessage wantScheduled map[string][]base.Z }{ @@ -407,12 +456,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { task: task, delay: 1 * time.Hour, opts: []Option{}, - wantRes: &Result{ - ProcessAt: now.Add(1 * time.Hour), - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStateScheduled, + nextProcessAt: time.Now().Add(1 * time.Hour), }, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -438,12 +492,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { task: task, delay: 0, opts: []Option{}, - wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: task.Type(), + Payload: task.Payload(), + Retry: defaultMaxRetry, + Queue: "default", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -467,18 +526,19 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { h.FlushDB(t, r) // clean up db before each test case. opts := append(tc.opts, ProcessIn(tc.delay)) - gotRes, err := client.Enqueue(tc.task, opts...) + gotInfo, err := client.Enqueue(tc.task, opts...) if err != nil { t.Error(err) continue } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), + cmp.AllowUnexported(TaskInfo{}), + cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s", - tc.desc, tc.delay, gotRes, tc.wantRes, diff) + tc.desc, tc.delay, gotInfo, tc.wantInfo, diff) } for qname, want := range tc.wantPending { @@ -537,7 +597,7 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts []Option // options set at the client level. opts []Option // options used at enqueue time. task *Task - wantRes *Result + wantInfo *TaskInfo queue string // queue that the message should go into. want *base.TaskMessage }{ @@ -546,12 +606,17 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed")}, opts: []Option{}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "feed", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: defaultMaxRetry, + Queue: "feed", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, queue: "feed", want: &base.TaskMessage{ @@ -568,12 +633,17 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "feed", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: 5, + Queue: "feed", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, queue: "feed", want: &base.TaskMessage{ @@ -590,12 +660,17 @@ func TestClientDefaultOptions(t *testing.T) { defaultOpts: []Option{Queue("feed"), MaxRetry(5)}, opts: []Option{Queue("critical")}, task: NewTask("feed:import", nil), - wantRes: &Result{ - ProcessAt: now, - Queue: "critical", - Retry: 5, - Timeout: defaultTimeout, - Deadline: noDeadline, + wantInfo: &TaskInfo{ + msg: &base.TaskMessage{ + Type: "feed:import", + Payload: nil, + Retry: 5, + Queue: "critical", + Timeout: int64(defaultTimeout.Seconds()), + Deadline: noDeadline.Unix(), + }, + state: base.TaskStatePending, + nextProcessAt: now, }, queue: "critical", want: &base.TaskMessage{ @@ -614,17 +689,18 @@ func TestClientDefaultOptions(t *testing.T) { c := NewClient(getRedisConnOpt(t)) defer c.Close() c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...) - gotRes, err := c.Enqueue(tc.task, tc.opts...) + gotInfo, err := c.Enqueue(tc.task, tc.opts...) if err != nil { t.Fatal(err) } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), + cmp.AllowUnexported(TaskInfo{}), + cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } - if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { + if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) + tc.desc, gotInfo, tc.wantInfo, diff) } pending := h.GetPendingMessages(t, r, tc.queue) if len(pending) != 1 { diff --git a/doc.go b/doc.go index 06ede25..76d1b4b 100644 --- a/doc.go +++ b/doc.go @@ -29,10 +29,10 @@ The Client is used to enqueue a task. task := asynq.NewTask("example", b) // Enqueue the task to be processed immediately. - res, err := client.Enqueue(task) + info, err := client.Enqueue(task) // Schedule the task to be processed after one minute. - res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) + info, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) The Server is used to run the task processing workers with a given handler. diff --git a/scheduler.go b/scheduler.go index b9bc3d4..c394b6a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -117,7 +117,7 @@ type enqueueJob struct { } func (j *enqueueJob) Run() { - res, err := j.client.Enqueue(j.task, j.opts...) + info, err := j.client.Enqueue(j.task, j.opts...) if err != nil { j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err) if j.errHandler != nil { @@ -125,10 +125,10 @@ func (j *enqueueJob) Run() { } return } - j.logger.Debugf("scheduler enqueued a task: %+v", res) + j.logger.Debugf("scheduler enqueued a task: %+v", info) event := &base.SchedulerEnqueueEvent{ - TaskID: res.ID, - EnqueuedAt: res.EnqueuedAt.In(j.location), + TaskID: info.ID(), + EnqueuedAt: time.Now().In(j.location), } err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) if err != nil {