diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 3c35833..8e1bb2f 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -68,26 +68,20 @@ var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { - return &base.TaskMessage{ - ID: uuid.New(), - Type: taskType, - Queue: base.DefaultQueueName, - Retry: 25, - Payload: payload, - Timeout: 1800, // default timeout of 30 mins - Deadline: 0, // no deadline - } + return NewTaskMessageWithQueue(taskType, payload, base.DefaultQueueName) } // NewTaskMessageWithQueue returns a new instance of TaskMessage given a // task type, payload and queue name. func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage { return &base.TaskMessage{ - ID: uuid.New(), - Type: taskType, - Queue: qname, - Retry: 25, - Payload: payload, + ID: uuid.New(), + Type: taskType, + Queue: qname, + Retry: 25, + Payload: payload, + Timeout: 1800, // default timeout of 30 mins + Deadline: 0, // no deadline } } diff --git a/processor_test.go b/processor_test.go index 101ed23..ac2728f 100644 --- a/processor_test.go +++ b/processor_test.go @@ -42,14 +42,14 @@ func fakeSyncer(syncCh <-chan *syncRequest, done <-chan struct{}) { } } -func TestProcessorSuccess(t *testing.T) { +func TestProcessorSuccessWithSingleQueue(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("gen_thumbnail", nil) - m3 := h.NewTaskMessage("reindex", nil) - m4 := h.NewTaskMessage("sync", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) t1 := NewTask(m1.Type, m1.Payload) t2 := NewTask(m2.Type, m2.Payload) @@ -131,6 +131,98 @@ func TestProcessorSuccess(t *testing.T) { } } +func TestProcessorSuccessWithMultipleQueues(t *testing.T) { + var ( + r = setup(t) + rdbClient = rdb.NewRDB(r) + + m1 = h.NewTaskMessage("task1", nil) + m2 = h.NewTaskMessage("task2", nil) + m3 = h.NewTaskMessageWithQueue("task3", nil, "high") + m4 = h.NewTaskMessageWithQueue("task4", nil, "low") + + t1 = NewTask(m1.Type, m1.Payload) + t2 = NewTask(m2.Type, m2.Payload) + t3 = NewTask(m3.Type, m3.Payload) + t4 = NewTask(m4.Type, m4.Payload) + ) + + tests := []struct { + enqueued map[string][]*base.TaskMessage + queues []string // list of queues to consume the tasks from + wantProcessed []*Task // tasks to be processed at the end + }{ + { + enqueued: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "high": {m3}, + "low": {m4}, + }, + queues: []string{"default", "high", "low"}, + wantProcessed: []*Task{t1, t2, t3, t4}, + }, + } + + for _, tc := range tests { + // Set up test case. + h.FlushDB(t, r) + h.SeedAllEnqueuedQueues(t, r, tc.enqueued) + + // Instantiate a new processor. + var mu sync.Mutex + var processed []*Task + handler := func(ctx context.Context, task *Task) error { + mu.Lock() + defer mu.Unlock() + processed = append(processed, task) + return nil + } + starting := make(chan *base.TaskMessage) + finished := make(chan *base.TaskMessage) + syncCh := make(chan *syncRequest) + done := make(chan struct{}) + defer func() { close(done) }() + go fakeHeartbeater(starting, finished, done) + go fakeSyncer(syncCh, done) + p := newProcessor(processorParams{ + logger: testLogger, + broker: rdbClient, + retryDelayFunc: defaultDelayFunc, + syncCh: syncCh, + cancelations: base.NewCancelations(), + concurrency: 10, + queues: map[string]int{ + "default": 2, + "high": 3, + "low": 1, + }, + strictPriority: false, + errHandler: nil, + shutdownTimeout: defaultShutdownTimeout, + starting: starting, + finished: finished, + }) + p.handler = HandlerFunc(handler) + + p.start(&sync.WaitGroup{}) + // Wait for two second to allow all enqueued tasks to be processed. + time.Sleep(2 * time.Second) + // Make sure no messages are stuck in in-progress list. + for _, qname := range tc.queues { + if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l) + } + } + p.terminate() + + mu.Lock() + if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { + t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) + } + mu.Unlock() + } +} + // https://github.com/hibiken/asynq/issues/166 func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { r := setup(t) @@ -371,27 +463,31 @@ func TestProcessorQueues(t *testing.T) { } func TestProcessorWithStrictPriority(t *testing.T) { - r := setup(t) - rdbClient := rdb.NewRDB(r) + var ( + r = setup(t) - m1 := h.NewTaskMessage("send_email", nil) - m2 := h.NewTaskMessage("send_email", nil) - m3 := h.NewTaskMessage("send_email", nil) - m4 := h.NewTaskMessage("gen_thumbnail", nil) - m5 := h.NewTaskMessage("gen_thumbnail", nil) - m6 := h.NewTaskMessage("sync", nil) - m7 := h.NewTaskMessage("sync", nil) + rdbClient = rdb.NewRDB(r) - t1 := NewTask(m1.Type, m1.Payload) - t2 := NewTask(m2.Type, m2.Payload) - t3 := NewTask(m3.Type, m3.Payload) - t4 := NewTask(m4.Type, m4.Payload) - t5 := NewTask(m5.Type, m5.Payload) - t6 := NewTask(m6.Type, m6.Payload) - t7 := NewTask(m7.Type, m7.Payload) + m1 = h.NewTaskMessageWithQueue("task1", nil, "critical") + m2 = h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 = h.NewTaskMessageWithQueue("task3", nil, "critical") + m4 = h.NewTaskMessageWithQueue("task4", nil, base.DefaultQueueName) + m5 = h.NewTaskMessageWithQueue("task5", nil, base.DefaultQueueName) + m6 = h.NewTaskMessageWithQueue("task6", nil, "low") + m7 = h.NewTaskMessageWithQueue("task7", nil, "low") + + t1 = NewTask(m1.Type, m1.Payload) + t2 = NewTask(m2.Type, m2.Payload) + t3 = NewTask(m3.Type, m3.Payload) + t4 = NewTask(m4.Type, m4.Payload) + t5 = NewTask(m5.Type, m5.Payload) + t6 = NewTask(m6.Type, m6.Payload) + t7 = NewTask(m7.Type, m7.Payload) + ) tests := []struct { enqueued map[string][]*base.TaskMessage // initial queues state + queues []string // list of queues to consume tasks from wait time.Duration // wait duration between starting and stopping processor for this test case wantProcessed []*Task // tasks to be processed at the end }{ @@ -401,6 +497,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { "critical": {m1, m2, m3}, "low": {m6, m7}, }, + queues: []string{base.DefaultQueueName, "critical", "low"}, wait: time.Second, wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7}, }, @@ -422,20 +519,22 @@ func TestProcessorWithStrictPriority(t *testing.T) { return nil } queueCfg := map[string]int{ - "critical": 3, base.DefaultQueueName: 2, + "critical": 3, "low": 1, } starting := make(chan *base.TaskMessage) finished := make(chan *base.TaskMessage) + syncCh := make(chan *syncRequest) done := make(chan struct{}) defer func() { close(done) }() go fakeHeartbeater(starting, finished, done) + go fakeSyncer(syncCh, done) p := newProcessor(processorParams{ logger: testLogger, broker: rdbClient, retryDelayFunc: defaultDelayFunc, - syncCh: nil, + syncCh: syncCh, cancelations: base.NewCancelations(), concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. queues: queueCfg, @@ -447,20 +546,20 @@ func TestProcessorWithStrictPriority(t *testing.T) { }) p.handler = HandlerFunc(handler) - t.Log("Starting Processor") p.start(&sync.WaitGroup{}) time.Sleep(tc.wait) - t.Log("Terminating Processor") + // Make sure no tasks are stuck in in-progress list. + for _, qname := range tc.queues { + if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l) + } + } p.terminate() - t.Log("Terminated Processor") if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } - if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) - } } }