2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 09:56:12 +08:00

chore(): 设置jobId 为cmdb的instanceId

This commit is contained in:
pacinochen
2022-03-07 15:54:36 +08:00
parent b345b237c7
commit 21d91a1313
14 changed files with 94 additions and 90 deletions

View File

@@ -22,6 +22,9 @@ type Task struct {
// typename indicates the type of task to be performed. // typename indicates the type of task to be performed.
typename string typename string
// 任务ID,用于asynq维护自己的任务map
taskId string
// payload holds data needed to perform the task. // payload holds data needed to perform the task.
payload []byte payload []byte
@@ -43,9 +46,10 @@ func (t *Task) ResultWriter() *ResultWriter { return t.w }
// NewTask returns a new Task given a type name and payload data. // NewTask returns a new Task given a type name and payload data.
// Options can be passed to configure task processing behavior. // Options can be passed to configure task processing behavior.
func NewTask(typename string, payload []byte, opts ...Option) *Task { func NewTask(typename, taskId string, payload []byte, opts ...Option) *Task {
return &Task{ return &Task{
typename: typename, typename: typename,
taskId: taskId,
payload: payload, payload: payload,
opts: opts, opts: opts,
} }

View File

@@ -21,7 +21,7 @@ func makeTask(n int) *Task {
if err != nil { if err != nil {
panic(err) panic(err)
} }
return NewTask(fmt.Sprintf("task%d", n), b) return NewTask(fmt.Sprintf("task%d", n), "", b)
} }
// Simple E2E Benchmark testing with no scheduled tasks and retries. // Simple E2E Benchmark testing with no scheduled tasks and retries.
@@ -222,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing") b.Log("Starting enqueueing")
enqueued := 0 enqueued := 0
for enqueued < 100000 { for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued})) t := NewTask(fmt.Sprintf("enqueued%d", enqueued), "", h.JSON(map[string]interface{}{"data": enqueued}))
if _, err := client.Enqueue(t); err != nil { if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err) b.Logf("could not enqueue task %d: %v", enqueued, err)
continue continue

View File

@@ -21,7 +21,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
var ( var (
now = time.Now() now = time.Now()
@@ -148,7 +148,7 @@ func TestClientEnqueue(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@@ -483,7 +483,7 @@ func TestClientEnqueueWithTaskIDOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", nil) task := NewTask("send_email", "", nil)
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@@ -561,7 +561,7 @@ func TestClientEnqueueWithConflictingTaskID(t *testing.T) {
defer client.Close() defer client.Close()
const taskID = "custom_id" const taskID = "custom_id"
task := NewTask("foo", nil) task := NewTask("foo", "taskId", nil)
if _, err := client.Enqueue(task, TaskID(taskID)); err != nil { if _, err := client.Enqueue(task, TaskID(taskID)); err != nil {
t.Fatalf("First task: Enqueue failed: %v", err) t.Fatalf("First task: Enqueue failed: %v", err)
@@ -577,7 +577,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@@ -700,7 +700,7 @@ func TestClientEnqueueError(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", "", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
tests := []struct { tests := []struct {
desc string desc string
@@ -716,27 +716,27 @@ func TestClientEnqueueError(t *testing.T) {
}, },
{ {
desc: "With empty task typename", desc: "With empty task typename",
task: NewTask("", h.JSON(map[string]interface{}{})), task: NewTask("", "", h.JSON(map[string]interface{}{})),
opts: []Option{}, opts: []Option{},
}, },
{ {
desc: "With blank task typename", desc: "With blank task typename",
task: NewTask(" ", h.JSON(map[string]interface{}{})), task: NewTask(" ", "", h.JSON(map[string]interface{}{})),
opts: []Option{}, opts: []Option{},
}, },
{ {
desc: "With empty task ID", desc: "With empty task ID",
task: NewTask("foo", nil), task: NewTask("foo", "", nil),
opts: []Option{TaskID("")}, opts: []Option{TaskID("")},
}, },
{ {
desc: "With blank task ID", desc: "With blank task ID",
task: NewTask("foo", nil), task: NewTask("foo", "", nil),
opts: []Option{TaskID(" ")}, opts: []Option{TaskID(" ")},
}, },
{ {
desc: "With unique option less than 1s", desc: "With unique option less than 1s",
task: NewTask("foo", nil), task: NewTask("foo", "", nil),
opts: []Option{Unique(300 * time.Millisecond)}, opts: []Option{Unique(300 * time.Millisecond)},
}, },
} }
@@ -858,7 +858,7 @@ func TestClientWithDefaultOptions(t *testing.T) {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close() defer c.Close()
task := NewTask(tc.tasktype, tc.payload, tc.defaultOpts...) task := NewTask(tc.tasktype, "", tc.payload, tc.defaultOpts...)
gotInfo, err := c.Enqueue(task, tc.opts...) gotInfo, err := c.Enqueue(task, tc.opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@@ -895,7 +895,7 @@ func TestClientEnqueueUnique(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), NewTask("email", "", h.JSON(map[string]interface{}{"user_id": 123})),
time.Hour, time.Hour,
}, },
} }
@@ -939,7 +939,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("reindex", nil), NewTask("reindex", "", nil),
time.Hour, time.Hour,
10 * time.Minute, 10 * time.Minute,
}, },
@@ -985,7 +985,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("reindex", nil), NewTask("reindex", "", nil),
time.Now().Add(time.Hour), time.Now().Add(time.Hour),
10 * time.Minute, 10 * time.Minute,
}, },

View File

@@ -86,10 +86,10 @@ func ExampleScheduler() {
&asynq.SchedulerOpts{Location: time.Local}, &asynq.SchedulerOpts{Location: time.Local},
) )
if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", nil)); err != nil { if _, err := scheduler.Register("* * * * *", asynq.NewTask("task1", "", nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }
if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", nil)); err != nil { if _, err := scheduler.Register("@every 30s", asynq.NewTask("task2", "", nil)); err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@@ -802,7 +802,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
} }
for _, e := range res { for _, e := range res {
task := NewTask(e.Type, e.Payload) task := NewTask(e.Type, "", e.Payload)
var opts []Option var opts []Option
for _, s := range e.Opts { for _, s := range e.Opts {
if o, err := parseOption(s); err == nil { if o, err := parseOption(s); err == nil {

View File

@@ -3220,14 +3220,14 @@ func TestInspectorSchedulerEntries(t *testing.T) {
want: []*SchedulerEntry{ want: []*SchedulerEntry{
{ {
Spec: "* * * * *", Spec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: nil, Opts: nil,
Next: now.Add(5 * time.Hour), Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour), Prev: now.Add(-2 * time.Hour),
}, },
{ {
Spec: "@every 20m", Spec: "@every 20m",
Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), Task: NewTask("bar", "taskId", h.JSON(map[string]interface{}{"fiz": "baz"})),
Opts: []Option{Queue("bar"), MaxRetry(20)}, Opts: []Option{Queue("bar"), MaxRetry(20)},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),

View File

@@ -33,8 +33,8 @@ func (p *FakeConfigProvider) GetConfigs() ([]*PeriodicTaskConfig, error) {
func TestNewPeriodicTaskManager(t *testing.T) { func TestNewPeriodicTaskManager(t *testing.T) {
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)}, {Cronspec: "* * * * *", Task: NewTask("foo", "", nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)}, {Cronspec: "* * * * *", Task: NewTask("bar", "", nil)},
} }
tests := []struct { tests := []struct {
desc string desc string
@@ -78,8 +78,8 @@ func TestNewPeriodicTaskManager(t *testing.T) {
func TestNewPeriodicTaskManagerError(t *testing.T) { func TestNewPeriodicTaskManagerError(t *testing.T) {
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Cronspec: "* * * * *", Task: NewTask("foo", nil)}, {Cronspec: "* * * * *", Task: NewTask("foo", "", nil)},
{Cronspec: "* * * * *", Task: NewTask("bar", nil)}, {Cronspec: "* * * * *", Task: NewTask("bar", "", nil)},
} }
tests := []struct { tests := []struct {
desc string desc string
@@ -118,11 +118,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "basic identity test", desc: "basic identity test",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
}, },
isSame: true, isSame: true,
}, },
@@ -130,12 +130,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with a option", desc: "with a option",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
isSame: true, isSame: true,
@@ -144,12 +144,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with multiple options (different order)", desc: "with multiple options (different order)",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")}, Opts: []Option{Unique(5 * time.Minute), Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(5 * time.Minute)},
}, },
isSame: true, isSame: true,
@@ -158,12 +158,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with payload", desc: "with payload",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello world!")), Task: NewTask("foo", "", []byte("hello world!")),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello world!")), Task: NewTask("foo", "", []byte("hello world!")),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
isSame: true, isSame: true,
@@ -172,11 +172,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different cronspecs", desc: "with different cronspecs",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "5 * * * *", Cronspec: "5 * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
}, },
isSame: false, isSame: false,
}, },
@@ -184,11 +184,11 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different task type", desc: "with different task type",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("bar", nil), Task: NewTask("bar", "", nil),
}, },
isSame: false, isSame: false,
}, },
@@ -196,12 +196,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different options", desc: "with different options",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Unique(10 * time.Minute)}, Opts: []Option{Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@@ -210,12 +210,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different options (one is subset of the other)", desc: "with different options (one is subset of the other)",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", nil), Task: NewTask("foo", "", nil),
Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@@ -224,12 +224,12 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
desc: "with different payload", desc: "with different payload",
a: &PeriodicTaskConfig{ a: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("hello!")), Task: NewTask("foo", "", []byte("hello!")),
Opts: []Option{Queue("myqueue")}, Opts: []Option{Queue("myqueue")},
}, },
b: &PeriodicTaskConfig{ b: &PeriodicTaskConfig{
Cronspec: "* * * * *", Cronspec: "* * * * *",
Task: NewTask("foo", []byte("HELLO!")), Task: NewTask("foo", "", []byte("HELLO!")),
Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)}, Opts: []Option{Queue("myqueue"), Unique(10 * time.Minute)},
}, },
isSame: false, isSame: false,
@@ -255,8 +255,8 @@ func TestPeriodicTaskConfigHash(t *testing.T) {
func TestPeriodicTaskManager(t *testing.T) { func TestPeriodicTaskManager(t *testing.T) {
// Note: In this test, we'll use task type as an ID for each config. // Note: In this test, we'll use task type as an ID for each config.
cfgs := []*PeriodicTaskConfig{ cfgs := []*PeriodicTaskConfig{
{Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, {Task: NewTask("task1", "", nil), Cronspec: "* * * * 1"},
{Task: NewTask("task2", nil), Cronspec: "* * * * 2"}, {Task: NewTask("task2", "", nil), Cronspec: "* * * * 2"},
} }
const syncInterval = 3 * time.Second const syncInterval = 3 * time.Second
provider := &FakeConfigProvider{cfgs: cfgs} provider := &FakeConfigProvider{cfgs: cfgs}
@@ -287,8 +287,8 @@ func TestPeriodicTaskManager(t *testing.T) {
// - task2 removed // - task2 removed
// - task3 added // - task3 added
provider.SetConfigs([]*PeriodicTaskConfig{ provider.SetConfigs([]*PeriodicTaskConfig{
{Task: NewTask("task1", nil), Cronspec: "* * * * 1"}, {Task: NewTask("task1", "", nil), Cronspec: "* * * * 1"},
{Task: NewTask("task3", nil), Cronspec: "* * * * 3"}, {Task: NewTask("task3", "", nil), Cronspec: "* * * * 3"},
}) })
// Wait for the next sync // Wait for the next sync

View File

@@ -323,7 +323,7 @@ var SkipRetry = errors.New("skip retry for the task")
func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) { func (p *processor) handleFailedMessage(ctx context.Context, l *base.Lease, msg *base.TaskMessage, err error) {
if p.errHandler != nil { if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err) p.errHandler.HandleError(ctx, NewTask(msg.Type, "", msg.Payload), err)
} }
if !p.isFailureFunc(err) { if !p.isFailureFunc(err) {
// retry the task without marking it as failed // retry the task without marking it as failed
@@ -344,7 +344,7 @@ func (p *processor) retry(l *base.Lease, msg *base.TaskMessage, e error, isFailu
return return
} }
ctx, _ := context.WithDeadline(context.Background(), l.Deadline()) ctx, _ := context.WithDeadline(context.Background(), l.Deadline())
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, "", msg.Payload))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure) err := p.broker.Retry(ctx, msg, retryAt, e.Error(), isFailure)
if err != nil { if err != nil {

View File

@@ -91,10 +91,10 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
m3 := h.NewTaskMessage("task3", nil) m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessage("task4", nil) m4 := h.NewTaskMessage("task4", nil)
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, "", m1.Payload)
t2 := NewTask(m2.Type, m2.Payload) t2 := NewTask(m2.Type, "", m2.Payload)
t3 := NewTask(m3.Type, m3.Payload) t3 := NewTask(m3.Type, "", m3.Payload)
t4 := NewTask(m4.Type, m4.Payload) t4 := NewTask(m4.Type, "", m4.Payload)
tests := []struct { tests := []struct {
pending []*base.TaskMessage // initial default queue state pending []*base.TaskMessage // initial default queue state
@@ -160,10 +160,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
m3 = h.NewTaskMessageWithQueue("task3", nil, "high") m3 = h.NewTaskMessageWithQueue("task3", nil, "high")
m4 = h.NewTaskMessageWithQueue("task4", nil, "low") m4 = h.NewTaskMessageWithQueue("task4", nil, "low")
t1 = NewTask(m1.Type, m1.Payload) t1 = NewTask(m1.Type, "", m1.Payload)
t2 = NewTask(m2.Type, m2.Payload) t2 = NewTask(m2.Type, "", m2.Payload)
t3 = NewTask(m3.Type, m3.Payload) t3 = NewTask(m3.Type, "", m3.Payload)
t4 = NewTask(m4.Type, m4.Payload) t4 = NewTask(m4.Type, "", m4.Payload)
) )
defer r.Close() defer r.Close()
@@ -230,7 +230,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111})) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111}))
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, "", m1.Payload)
tests := []struct { tests := []struct {
pending []*base.TaskMessage // initial default queue state pending []*base.TaskMessage // initial default queue state
@@ -636,13 +636,13 @@ func TestProcessorWithStrictPriority(t *testing.T) {
m6 = h.NewTaskMessageWithQueue("task6", nil, "low") m6 = h.NewTaskMessageWithQueue("task6", nil, "low")
m7 = h.NewTaskMessageWithQueue("task7", nil, "low") m7 = h.NewTaskMessageWithQueue("task7", nil, "low")
t1 = NewTask(m1.Type, m1.Payload) t1 = NewTask(m1.Type, "", m1.Payload)
t2 = NewTask(m2.Type, m2.Payload) t2 = NewTask(m2.Type, "", m2.Payload)
t3 = NewTask(m3.Type, m3.Payload) t3 = NewTask(m3.Type, "", m3.Payload)
t4 = NewTask(m4.Type, m4.Payload) t4 = NewTask(m4.Type, "", m4.Payload)
t5 = NewTask(m5.Type, m5.Payload) t5 = NewTask(m5.Type, "", m5.Payload)
t6 = NewTask(m6.Type, m6.Payload) t6 = NewTask(m6.Type, "", m6.Payload)
t7 = NewTask(m7.Type, m7.Payload) t7 = NewTask(m7.Type, "", m7.Payload)
) )
defer r.Close() defer r.Close()
@@ -738,7 +738,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return nil return nil
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: false, wantErr: false,
}, },
{ {
@@ -746,7 +746,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return fmt.Errorf("something went wrong") return fmt.Errorf("something went wrong")
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
{ {
@@ -754,7 +754,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
panic("something went terribly wrong") panic("something went terribly wrong")
}, },
task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", "", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
} }

View File

@@ -99,7 +99,7 @@ func (r *recoverer) recover() {
} }
func (r *recoverer) retry(msg *base.TaskMessage, err error) { func (r *recoverer) retry(msg *base.TaskMessage, err error) {
delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, msg.Payload)) delay := r.retryDelayFunc(msg.Retried, err, NewTask(msg.Type, "", msg.Payload))
retryAt := time.Now().Add(delay) retryAt := time.Now().Add(delay)
if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil { if err := r.broker.Retry(context.Background(), msg, retryAt, err.Error(), r.isFailureFunc(err)); err != nil {
r.logger.Warnf("recoverer: could not retry lease expired task: %v", err) r.logger.Warnf("recoverer: could not retry lease expired task: %v", err)

View File

@@ -112,7 +112,7 @@ type SchedulerOpts struct {
// enqueueJob encapsulates the job of enqueing a task and recording the event. // enqueueJob encapsulates the job of enqueing a task and recording the event.
type enqueueJob struct { type enqueueJob struct {
id uuid.UUID id string
cronspec string cronspec string
task *Task task *Task
opts []Option opts []Option
@@ -137,7 +137,7 @@ func (j *enqueueJob) Run() {
TaskID: info.ID, TaskID: info.ID,
EnqueuedAt: time.Now().In(j.location), EnqueuedAt: time.Now().In(j.location),
} }
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) err = j.rdb.RecordSchedulerEnqueueEvent(j.id, event)
if err != nil { if err != nil {
j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err) j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err)
} }
@@ -147,7 +147,7 @@ func (j *enqueueJob) Run() {
// It returns an ID of the newly registered entry. // It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{ job := &enqueueJob{
id: uuid.New(), id: task.taskId,
cronspec: cronspec, cronspec: cronspec,
task: task, task: task,
opts: opts, opts: opts,
@@ -162,9 +162,9 @@ func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entry
return "", err return "", err
} }
s.mu.Lock() s.mu.Lock()
s.idmap[job.id.String()] = cronID s.idmap[job.id] = cronID
s.mu.Unlock() s.mu.Unlock()
return job.id.String(), nil return job.id, nil
} }
// Unregister removes a registered entry by entry ID. // Unregister removes a registered entry by entry ID.
@@ -265,7 +265,7 @@ func (s *Scheduler) beat() {
for _, entry := range s.cron.Entries() { for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob) job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{ e := &base.SchedulerEntry{
ID: job.id.String(), ID: job.id,
Spec: job.cronspec, Spec: job.cronspec,
Type: job.task.Type(), Type: job.task.Type(),
Payload: job.task.Payload(), Payload: job.task.Payload(),
@@ -292,8 +292,8 @@ func stringifyOptions(opts []Option) []string {
func (s *Scheduler) clearHistory() { func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() { for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob) job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil { if err := s.rdb.ClearSchedulerHistory(job.id); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err) s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id, err)
} }
} }
} }

View File

@@ -25,7 +25,7 @@ func TestSchedulerRegister(t *testing.T) {
}{ }{
{ {
cronspec: "@every 3s", cronspec: "@every 3s",
task: NewTask("task1", nil), task: NewTask("task1", "", nil),
opts: []Option{MaxRetry(10)}, opts: []Option{MaxRetry(10)},
wait: 10 * time.Second, wait: 10 * time.Second,
queue: "default", queue: "default",
@@ -93,7 +93,7 @@ func TestSchedulerWhenRedisDown(t *testing.T) {
&SchedulerOpts{EnqueueErrorHandler: errorHandler}, &SchedulerOpts{EnqueueErrorHandler: errorHandler},
) )
task := NewTask("test", nil) task := NewTask("test", "", nil)
if _, err := scheduler.Register("@every 3s", task); err != nil { if _, err := scheduler.Register("@every 3s", task); err != nil {
t.Fatal(err) t.Fatal(err)
@@ -123,7 +123,7 @@ func TestSchedulerUnregister(t *testing.T) {
}{ }{
{ {
cronspec: "@every 3s", cronspec: "@every 3s",
task: NewTask("task1", nil), task: NewTask("task1", "", nil),
opts: []Option{MaxRetry(10)}, opts: []Option{MaxRetry(10)},
wait: 10 * time.Second, wait: 10 * time.Second,
queue: "default", queue: "default",

View File

@@ -62,7 +62,7 @@ func TestServeMux(t *testing.T) {
for _, tc := range serveMuxTests { for _, tc := range serveMuxTests {
called = "" // reset to zero value called = "" // reset to zero value
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, "", nil)
if err := mux.ProcessTask(context.Background(), task); err != nil { if err := mux.ProcessTask(context.Background(), task); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -121,7 +121,7 @@ func TestServeMuxNotFound(t *testing.T) {
} }
for _, tc := range notFoundTests { for _, tc := range notFoundTests {
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, "", nil)
err := mux.ProcessTask(context.Background(), task) err := mux.ProcessTask(context.Background(), task)
if err == nil { if err == nil {
t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type()) t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type())
@@ -154,7 +154,7 @@ func TestServeMuxMiddlewares(t *testing.T) {
invoked = []string{} // reset to empty slice invoked = []string{} // reset to empty slice
called = "" // reset to zero value called = "" // reset to zero value
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, "", nil)
if err := mux.ProcessTask(context.Background(), task); err != nil { if err := mux.ProcessTask(context.Background(), task); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -40,12 +40,12 @@ func TestServer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 123}))) _, err = c.Enqueue(NewTask("send_email", "", asynqtest.JSON(map[string]interface{}{"recipient_id": 123})))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
_, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) _, err = c.Enqueue(NewTask("send_email", "", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
@@ -183,15 +183,15 @@ func TestServerWithFlakyBroker(t *testing.T) {
} }
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
_, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) _, err := c.Enqueue(NewTask("enqueued", "", nil), MaxRetry(i))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("bad_task", nil)) _, err = c.Enqueue(NewTask("bad_task", "", nil))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second)) _, err = c.Enqueue(NewTask("scheduled", "", nil), ProcessIn(time.Duration(i)*time.Second))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }