From 21d91a1313899d7be1b3d966bd2fe20e4d4ac2b6 Mon Sep 17 00:00:00 2001 From: pacinochen Date: Mon, 7 Mar 2022 15:54:36 +0800 Subject: [PATCH] =?UTF-8?q?chore():=20=E8=AE=BE=E7=BD=AEjobId=20=E4=B8=BAc?= =?UTF-8?q?mdb=E7=9A=84instanceId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- asynq.go | 6 +++- benchmark_test.go | 4 +-- client_test.go | 30 ++++++++++---------- example_test.go | 4 +-- inspector.go | 2 +- inspector_test.go | 4 +-- periodic_task_manager_test.go | 52 +++++++++++++++++------------------ processor.go | 4 +-- processor_test.go | 38 ++++++++++++------------- recoverer.go | 2 +- scheduler.go | 16 +++++------ scheduler_test.go | 6 ++-- servemux_test.go | 6 ++-- server_test.go | 10 +++---- 14 files changed, 94 insertions(+), 90 deletions(-) diff --git a/asynq.go b/asynq.go index 234d4b9..a0eb67b 100644 --- a/asynq.go +++ b/asynq.go @@ -22,6 +22,9 @@ type Task struct { // typename indicates the type of task to be performed. typename string + // 任务ID,用于asynq维护自己的任务map + taskId string + // payload holds data needed to perform the task. payload []byte @@ -43,9 +46,10 @@ func (t *Task) ResultWriter() *ResultWriter { return t.w } // NewTask returns a new Task given a type name and payload data. // Options can be passed to configure task processing behavior. -func NewTask(typename string, payload []byte, opts ...Option) *Task { +func NewTask(typename, taskId string, payload []byte, opts ...Option) *Task { return &Task{ typename: typename, + taskId: taskId, payload: payload, opts: opts, } diff --git a/benchmark_test.go b/benchmark_test.go index b98ea34..e35e613 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -21,7 +21,7 @@ func makeTask(n int) *Task { if err != nil { panic(err) } - return NewTask(fmt.Sprintf("task%d", n), b) + return NewTask(fmt.Sprintf("task%d", n), "", b) } // Simple E2E Benchmark testing with no scheduled tasks and retries. @@ -222,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { b.Log("Starting enqueueing") enqueued := 0 for enqueued < 100000 { - t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) + t := NewTask(fmt.Sprintf("enqueued%d", enqueued), "", h.JSON(map[string]interface{}{"data": enqueued})) if _, err := client.Enqueue(t); err != nil { b.Logf("could not enqueue task %d: %v", enqueued, err) continue diff --git a/client_test.go b/client_test.go index 867de91..c82eba4 100644 --- a/client_test.go +++ b/client_test.go @@ -21,7 +21,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) var ( now = time.Now() @@ -148,7 +148,7 @@ func TestClientEnqueue(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -483,7 +483,7 @@ func TestClientEnqueueWithTaskIDOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", nil) + task := NewTask("send_email", "", nil) now := time.Now() tests := []struct { @@ -561,7 +561,7 @@ func TestClientEnqueueWithConflictingTaskID(t *testing.T) { defer client.Close() const taskID = "custom_id" - task := NewTask("foo", nil) + task := NewTask("foo", "taskId", nil) if _, err := client.Enqueue(task, TaskID(taskID)); err != nil { t.Fatalf("First task: Enqueue failed: %v", err) @@ -577,7 +577,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) now := time.Now() tests := []struct { @@ -700,7 +700,7 @@ func TestClientEnqueueError(t *testing.T) { client := NewClient(getRedisConnOpt(t)) defer client.Close() - task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) + task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) tests := []struct { desc string @@ -716,27 +716,27 @@ func TestClientEnqueueError(t *testing.T) { }, { desc: "With empty task typename", - task: NewTask("", h.JSON(map[string]interface{}{})), + task: NewTask("", "", h.JSON(map[string]interface{}{})), opts: []Option{}, }, { desc: "With blank task typename", - task: NewTask(" ", h.JSON(map[string]interface{}{})), + task: NewTask(" ", "", h.JSON(map[string]interface{}{})), opts: []Option{}, }, { desc: "With empty task ID", - task: NewTask("foo", nil), + task: NewTask("foo", "", nil), opts: []Option{TaskID("")}, }, { desc: "With blank task ID", - task: NewTask("foo", nil), + task: NewTask("foo", "", nil), opts: []Option{TaskID(" ")}, }, { desc: "With unique option less than 1s", - task: NewTask("foo", nil), + task: NewTask("foo", "", nil), opts: []Option{Unique(300 * time.Millisecond)}, }, } @@ -858,7 +858,7 @@ func TestClientWithDefaultOptions(t *testing.T) { h.FlushDB(t, r) c := NewClient(getRedisConnOpt(t)) defer c.Close() - task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...) + task := NewTask(tc.tasktype, "", tc.payload, tc.defaultOpts...) gotInfo, err := c.Enqueue(task, tc.opts...) if err != nil { t.Fatal(err) @@ -895,7 +895,7 @@ func TestClientEnqueueUnique(t *testing.T) { ttl time.Duration }{ { - NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), + NewTask("email", "", h.JSON(map[string]interface{}{"user_id": 123})), time.Hour, }, } @@ -939,7 +939,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) { ttl time.Duration }{ { - NewTask("reindex", nil), + NewTask("reindex", "", nil), time.Hour, 10 * time.Minute, }, @@ -985,7 +985,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { ttl time.Duration }{ { - NewTask("reindex", nil), + NewTask("reindex", "", nil), time.Now().Add(time.Hour), 10 * time.Minute, }, diff --git a/example_test.go b/example_test.go index 333236d..952a63b 100644 --- a/example_test.go +++ b/example_test.go @@ -86,10 +86,10 @@ func ExampleScheduler() { &asynq.SchedulerOpts{Location: time.Local}, ) - if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { + if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", "", nil)); err != nil { log.Fatal(err) } - if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { + if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", "", nil)); err != nil { log.Fatal(err) } diff --git a/inspector.go b/inspector.go index 2c8498c..2f2d781 100644 --- a/inspector.go +++ b/inspector.go @@ -802,7 +802,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { } for _, e := range res { - task := NewTask(e.Type, e.Payload) + task := NewTask(e.Type, "", e.Payload) var opts []Option for _, s := range e.Opts { if o, err := parseOption(s); err == nil { diff --git a/inspector_test.go b/inspector_test.go index 4af0941..dfda640 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -3220,14 +3220,14 @@ func TestInspectorSchedulerEntries(t *testing.T) { want: []*SchedulerEntry{ { Spec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: nil, Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, { Spec: "@every 20m", - Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), + Task: NewTask("bar", "taskId", h.JSON(map[string]interface{}{"fiz": "baz"})), Opts: []Option{Queue("bar"), MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), diff --git a/periodic_task_manager_test.go b/periodic_task_manager_test.go index 24d5e5b..d4b9486 100644 --- a/periodic_task_manager_test.go +++ b/periodic_task_manager_test.go @@ -33,8 +33,8 @@ func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) { func TestNewPeriodicTaskManager(t *testing.T) { cfgs := []*PeriodicTaskConfig{ - {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, - {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + {Cronspec: "* * * * *", Task: NewTask("foo", "", nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", "", nil)}, } tests := []struct { desc string @@ -78,8 +78,8 @@ func TestNewPeriodicTaskManager(t *testing.T) { func TestNewPeriodicTaskManagerError(t *testing.T) { cfgs := []*PeriodicTaskConfig{ - {Cronspec: "* * * * *", Task: NewTask("foo", nil)}, - {Cronspec: "* * * * *", Task: NewTask("bar", nil)}, + {Cronspec: "* * * * *", Task: NewTask("foo", "", nil)}, + {Cronspec: "* * * * *", Task: NewTask("bar", "", nil)}, } tests := []struct { desc string @@ -118,11 +118,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "basic identity test", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), }, isSame: true, }, @@ -130,12 +130,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with a option", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue")}, }, isSame: true, @@ -144,12 +144,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with multiple options (different order)", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)}, }, isSame: true, @@ -158,12 +158,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with payload", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello world!")), + Task: NewTask("foo", "", []byte("hello world!")), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello world!")), + Task: NewTask("foo", "", []byte("hello world!")), Opts: []Option{Queue("myqueue")}, }, isSame: true, @@ -172,11 +172,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different cronspecs", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), }, b: &PeriodicTaskConfig{ Cronspec: "5 * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), }, isSame: false, }, @@ -184,11 +184,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different task type", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("bar", nil), + Task: NewTask("bar", "", nil), }, isSame: false, }, @@ -196,12 +196,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different options", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Unique(10 * time.Minute)}, }, isSame: false, @@ -210,12 +210,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different options (one is subset of the other)", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", nil), + Task: NewTask("foo", "", nil), Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, }, isSame: false, @@ -224,12 +224,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) { desc: "with different payload", a: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("hello!")), + Task: NewTask("foo", "", []byte("hello!")), Opts: []Option{Queue("myqueue")}, }, b: &PeriodicTaskConfig{ Cronspec: "* * * * *", - Task: NewTask("foo", []byte("HELLO!")), + Task: NewTask("foo", "", []byte("HELLO!")), Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, }, isSame: false, @@ -255,8 +255,8 @@ func TestPeriodicTaskConfigHash(t *testing.T) { func TestPeriodicTaskManager(t *testing.T) { // Note: In this test, we'll use task type as an ID for each config. cfgs := []*PeriodicTaskConfig{ - {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, - {Task: NewTask("task2", nil), Cronspec: "* * * * 2"}, + {Task: NewTask("task1", "", nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task2", "", nil), Cronspec: "* * * * 2"}, } const syncInterval = 3 * time.Second provider := &FakeConfigProvider{cfgs: cfgs} @@ -287,8 +287,8 @@ func TestPeriodicTaskManager(t *testing.T) { // - task2 removed // - task3 added provider.SetConfigs([]*PeriodicTaskConfig{ - {Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, - {Task: NewTask("task3", nil), Cronspec: "* * * * 3"}, + {Task: NewTask("task1", "", nil), Cronspec: "* * * * 1"}, + {Task: NewTask("task3", "", nil), Cronspec: "* * * * 3"}, }) // Wait for the next sync diff --git a/processor.go b/processor.go index 925b9c9..d914e50 100644 --- a/processor.go +++ b/processor.go @@ -323,7 +323,7 @@ var SkipRetry = errors.New("skip retry for the task") func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { if p.errHandler != nil { - p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) + p.errHandler.HandleError(ctx, NewTask(msg.Type, "", msg.Payload), err) } if !p.isFailureFunc(err) { // retry the task without marking it as failed @@ -344,7 +344,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu return } ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) - d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) + d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, "", msg.Payload)) retryAt := time.Now().Add(d) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) if err != nil { diff --git a/processor_test.go b/processor_test.go index 00f7ad6..a318813 100644 --- a/processor_test.go +++ b/processor_test.go @@ -91,10 +91,10 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { m3 := h.NewTaskMessage("task3", nil) m4 := h.NewTaskMessage("task4", nil) - t1 := NewTask(m1.Type, m1.Payload) - t2 := NewTask(m2.Type, m2.Payload) - t3 := NewTask(m3.Type, m3.Payload) - t4 := NewTask(m4.Type, m4.Payload) + t1 := NewTask(m1.Type, "", m1.Payload) + t2 := NewTask(m2.Type, "", m2.Payload) + t3 := NewTask(m3.Type, "", m3.Payload) + t4 := NewTask(m4.Type, "", m4.Payload) tests := []struct { pending []*base.TaskMessage // initial default queue state @@ -160,10 +160,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { m3 = h.NewTaskMessageWithQueue("task3", nil, "high") m4 = h.NewTaskMessageWithQueue("task4", nil, "low") - t1 = NewTask(m1.Type, m1.Payload) - t2 = NewTask(m2.Type, m2.Payload) - t3 = NewTask(m3.Type, m3.Payload) - t4 = NewTask(m4.Type, m4.Payload) + t1 = NewTask(m1.Type, "", m1.Payload) + t2 = NewTask(m2.Type, "", m2.Payload) + t3 = NewTask(m3.Type, "", m3.Payload) + t4 = NewTask(m4.Type, "", m4.Payload) ) defer r.Close() @@ -230,7 +230,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { rdbClient := rdb.NewRDB(r) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) - t1 := NewTask(m1.Type, m1.Payload) + t1 := NewTask(m1.Type, "", m1.Payload) tests := []struct { pending []*base.TaskMessage // initial default queue state @@ -636,13 +636,13 @@ func TestProcessorWithStrictPriority(t *testing.T) { m6 = h.NewTaskMessageWithQueue("task6", nil, "low") m7 = h.NewTaskMessageWithQueue("task7", nil, "low") - t1 = NewTask(m1.Type, m1.Payload) - t2 = NewTask(m2.Type, m2.Payload) - t3 = NewTask(m3.Type, m3.Payload) - t4 = NewTask(m4.Type, m4.Payload) - t5 = NewTask(m5.Type, m5.Payload) - t6 = NewTask(m6.Type, m6.Payload) - t7 = NewTask(m7.Type, m7.Payload) + t1 = NewTask(m1.Type, "", m1.Payload) + t2 = NewTask(m2.Type, "", m2.Payload) + t3 = NewTask(m3.Type, "", m3.Payload) + t4 = NewTask(m4.Type, "", m4.Payload) + t5 = NewTask(m5.Type, "", m5.Payload) + t6 = NewTask(m6.Type, "", m6.Payload) + t7 = NewTask(m7.Type, "", m7.Payload) ) defer r.Close() @@ -738,7 +738,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return nil }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: false, }, { @@ -746,7 +746,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { return fmt.Errorf("something went wrong") }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, { @@ -754,7 +754,7 @@ func TestProcessorPerform(t *testing.T) { handler: func(ctx context.Context, t *Task) error { panic("something went terribly wrong") }, - task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), + task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})), wantErr: true, }, } diff --git a/recoverer.go b/recoverer.go index a0107b4..cde9621 100644 --- a/recoverer.go +++ b/recoverer.go @@ -99,7 +99,7 @@ func (r *recoverer) recover() { } func (r *recoverer) retry(msg *base.TaskMessage, err error) { - delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) + delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, "", msg.Payload)) retryAt := time.Now().Add(delay) if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) diff --git a/scheduler.go b/scheduler.go index 6a87e66..be35d0c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -112,7 +112,7 @@ type SchedulerOpts struct { // enqueueJob encapsulates the job of enqueing a task and recording the event. type enqueueJob struct { - id uuid.UUID + id string cronspec string task *Task opts []Option @@ -137,7 +137,7 @@ func (j *enqueueJob) Run() { TaskID: info.ID, EnqueuedAt: time.Now().In(j.location), } - err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) + err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event) if err != nil { j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err) } @@ -147,7 +147,7 @@ func (j *enqueueJob) Run() { // It returns an ID of the newly registered entry. func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { job := &enqueueJob{ - id: uuid.New(), + id: task.taskId, cronspec: cronspec, task: task, opts: opts, @@ -162,9 +162,9 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry return "", err } s.mu.Lock() - s.idmap[job.id.String()] = cronID + s.idmap[job.id] = cronID s.mu.Unlock() - return job.id.String(), nil + return job.id, nil } // Unregister removes a registered entry by entry ID. @@ -265,7 +265,7 @@ func (s *Scheduler) beat() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) e := &base.SchedulerEntry{ - ID: job.id.String(), + ID: job.id, Spec: job.cronspec, Type: job.task.Type(), Payload: job.task.Payload(), @@ -292,8 +292,8 @@ func stringifyOptions(opts []Option) []string { func (s *Scheduler) clearHistory() { for _, entry := range s.cron.Entries() { job := entry.Job.(*enqueueJob) - if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { - s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) + if err := s.rdb.ClearSchedulerHistory(job.id); err != nil { + s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err) } } } diff --git a/scheduler_test.go b/scheduler_test.go index 9ee809f..61fd9da 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -25,7 +25,7 @@ func TestSchedulerRegister(t *testing.T) { }{ { cronspec: "@every 3s", - task: NewTask("task1", nil), + task: NewTask("task1", "", nil), opts: []Option{MaxRetry(10)}, wait: 10 * time.Second, queue: "default", @@ -93,7 +93,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) { &SchedulerOpts{EnqueueErrorHandler: errorHandler}, ) - task := NewTask("test", nil) + task := NewTask("test", "", nil) if _, err := scheduler.Register("@every 3s", task); err != nil { t.Fatal(err) @@ -123,7 +123,7 @@ func TestSchedulerUnregister(t *testing.T) { }{ { cronspec: "@every 3s", - task: NewTask("task1", nil), + task: NewTask("task1", "", nil), opts: []Option{MaxRetry(10)}, wait: 10 * time.Second, queue: "default", diff --git a/servemux_test.go b/servemux_test.go index 227c4d7..441a14d 100644 --- a/servemux_test.go +++ b/servemux_test.go @@ -62,7 +62,7 @@ func TestServeMux(t *testing.T) { for _, tc := range serveMuxTests { called = "" // reset to zero value - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, "", nil) if err := mux.ProcessTask(context.Background(), task); err != nil { t.Fatal(err) } @@ -121,7 +121,7 @@ func TestServeMuxNotFound(t *testing.T) { } for _, tc := range notFoundTests { - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, "", nil) err := mux.ProcessTask(context.Background(), task) if err == nil { t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) @@ -154,7 +154,7 @@ func TestServeMuxMiddlewares(t *testing.T) { invoked = []string{} // reset to empty slice called = "" // reset to zero value - task := NewTask(tc.typename, nil) + task := NewTask(tc.typename, "", nil) if err := mux.ProcessTask(context.Background(), task); err != nil { t.Fatal(err) } diff --git a/server_test.go b/server_test.go index f4f190a..87405ce 100644 --- a/server_test.go +++ b/server_test.go @@ -40,12 +40,12 @@ func TestServer(t *testing.T) { t.Fatal(err) } - _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 123}))) + _, err = c.Enqueue(NewTask("send_email", "", asynqtest.JSON(map[string]interface{}{"recipient_id": 123}))) if err != nil { t.Errorf("could not enqueue a task: %v", err) } - _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) + _, err = c.Enqueue(NewTask("send_email", "", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) if err != nil { t.Errorf("could not enqueue a task: %v", err) } @@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) { } for i := 0; i < 10; i++ { - _, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) + _, err := c.Enqueue(NewTask("enqueued", "", nil), MaxRetry(i)) if err != nil { t.Fatal(err) } - _, err = c.Enqueue(NewTask("bad_task", nil)) + _, err = c.Enqueue(NewTask("bad_task", "", nil)) if err != nil { t.Fatal(err) } - _, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second)) + _, err = c.Enqueue(NewTask("scheduled", "", nil), ProcessIn(time.Duration(i)*time.Second)) if err != nil { t.Fatal(err) }