2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Add more processor tests

This commit is contained in:
Ken Hibino 2020-08-18 21:21:05 -07:00
parent 83bdca5220
commit 96f23d88cd
2 changed files with 136 additions and 43 deletions

View File

@ -68,15 +68,7 @@ var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")
// NewTaskMessage returns a new instance of TaskMessage given a task type and payload. // NewTaskMessage returns a new instance of TaskMessage given a task type and payload.
func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage {
return &base.TaskMessage{ return NewTaskMessageWithQueue(taskType, payload, base.DefaultQueueName)
ID: uuid.New(),
Type: taskType,
Queue: base.DefaultQueueName,
Retry: 25,
Payload: payload,
Timeout: 1800, // default timeout of 30 mins
Deadline: 0, // no deadline
}
} }
// NewTaskMessageWithQueue returns a new instance of TaskMessage given a // NewTaskMessageWithQueue returns a new instance of TaskMessage given a
@ -88,6 +80,8 @@ func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qn
Queue: qname, Queue: qname,
Retry: 25, Retry: 25,
Payload: payload, Payload: payload,
Timeout: 1800, // default timeout of 30 mins
Deadline: 0, // no deadline
} }
} }

View File

@ -42,14 +42,14 @@ func fakeSyncer(syncCh <-chan *syncRequest, done <-chan struct{}) {
} }
} }
func TestProcessorSuccess(t *testing.T) { func TestProcessorSuccessWithSingleQueue(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("send_email", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("gen_thumbnail", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessage("sync", nil) m4 := h.NewTaskMessage("task4", nil)
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, m1.Payload)
t2 := NewTask(m2.Type, m2.Payload) t2 := NewTask(m2.Type, m2.Payload)
@ -131,6 +131,98 @@ func TestProcessorSuccess(t *testing.T) {
} }
} }
func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
var (
r = setup(t)
rdbClient = rdb.NewRDB(r)
m1 = h.NewTaskMessage("task1", nil)
m2 = h.NewTaskMessage("task2", nil)
m3 = h.NewTaskMessageWithQueue("task3", nil, "high")
m4 = h.NewTaskMessageWithQueue("task4", nil, "low")
t1 = NewTask(m1.Type, m1.Payload)
t2 = NewTask(m2.Type, m2.Payload)
t3 = NewTask(m3.Type, m3.Payload)
t4 = NewTask(m4.Type, m4.Payload)
)
tests := []struct {
enqueued map[string][]*base.TaskMessage
queues []string // list of queues to consume the tasks from
wantProcessed []*Task // tasks to be processed at the end
}{
{
enqueued: map[string][]*base.TaskMessage{
"default": {m1, m2},
"high": {m3},
"low": {m4},
},
queues: []string{"default", "high", "low"},
wantProcessed: []*Task{t1, t2, t3, t4},
},
}
for _, tc := range tests {
// Set up test case.
h.FlushDB(t, r)
h.SeedAllEnqueuedQueues(t, r, tc.enqueued)
// Instantiate a new processor.
var mu sync.Mutex
var processed []*Task
handler := func(ctx context.Context, task *Task) error {
mu.Lock()
defer mu.Unlock()
processed = append(processed, task)
return nil
}
starting := make(chan *base.TaskMessage)
finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{})
defer func() { close(done) }()
go fakeHeartbeater(starting, finished, done)
go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{
logger: testLogger,
broker: rdbClient,
retryDelayFunc: defaultDelayFunc,
syncCh: syncCh,
cancelations: base.NewCancelations(),
concurrency: 10,
queues: map[string]int{
"default": 2,
"high": 3,
"low": 1,
},
strictPriority: false,
errHandler: nil,
shutdownTimeout: defaultShutdownTimeout,
starting: starting,
finished: finished,
})
p.handler = HandlerFunc(handler)
p.start(&sync.WaitGroup{})
// Wait for two second to allow all enqueued tasks to be processed.
time.Sleep(2 * time.Second)
// Make sure no messages are stuck in in-progress list.
for _, qname := range tc.queues {
if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l)
}
}
p.terminate()
mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
}
mu.Unlock()
}
}
// https://github.com/hibiken/asynq/issues/166 // https://github.com/hibiken/asynq/issues/166
func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
r := setup(t) r := setup(t)
@ -371,27 +463,31 @@ func TestProcessorQueues(t *testing.T) {
} }
func TestProcessorWithStrictPriority(t *testing.T) { func TestProcessorWithStrictPriority(t *testing.T) {
r := setup(t) var (
rdbClient := rdb.NewRDB(r) r = setup(t)
m1 := h.NewTaskMessage("send_email", nil) rdbClient = rdb.NewRDB(r)
m2 := h.NewTaskMessage("send_email", nil)
m3 := h.NewTaskMessage("send_email", nil)
m4 := h.NewTaskMessage("gen_thumbnail", nil)
m5 := h.NewTaskMessage("gen_thumbnail", nil)
m6 := h.NewTaskMessage("sync", nil)
m7 := h.NewTaskMessage("sync", nil)
t1 := NewTask(m1.Type, m1.Payload) m1 = h.NewTaskMessageWithQueue("task1", nil, "critical")
t2 := NewTask(m2.Type, m2.Payload) m2 = h.NewTaskMessageWithQueue("task2", nil, "critical")
t3 := NewTask(m3.Type, m3.Payload) m3 = h.NewTaskMessageWithQueue("task3", nil, "critical")
t4 := NewTask(m4.Type, m4.Payload) m4 = h.NewTaskMessageWithQueue("task4", nil, base.DefaultQueueName)
t5 := NewTask(m5.Type, m5.Payload) m5 = h.NewTaskMessageWithQueue("task5", nil, base.DefaultQueueName)
t6 := NewTask(m6.Type, m6.Payload) m6 = h.NewTaskMessageWithQueue("task6", nil, "low")
t7 := NewTask(m7.Type, m7.Payload) m7 = h.NewTaskMessageWithQueue("task7", nil, "low")
t1 = NewTask(m1.Type, m1.Payload)
t2 = NewTask(m2.Type, m2.Payload)
t3 = NewTask(m3.Type, m3.Payload)
t4 = NewTask(m4.Type, m4.Payload)
t5 = NewTask(m5.Type, m5.Payload)
t6 = NewTask(m6.Type, m6.Payload)
t7 = NewTask(m7.Type, m7.Payload)
)
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage // initial queues state enqueued map[string][]*base.TaskMessage // initial queues state
queues []string // list of queues to consume tasks from
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end wantProcessed []*Task // tasks to be processed at the end
}{ }{
@ -401,6 +497,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"critical": {m1, m2, m3}, "critical": {m1, m2, m3},
"low": {m6, m7}, "low": {m6, m7},
}, },
queues: []string{base.DefaultQueueName, "critical", "low"},
wait: time.Second, wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7}, wantProcessed: []*Task{t1, t2, t3, t4, t5, t6, t7},
}, },
@ -422,20 +519,22 @@ func TestProcessorWithStrictPriority(t *testing.T) {
return nil return nil
} }
queueCfg := map[string]int{ queueCfg := map[string]int{
"critical": 3,
base.DefaultQueueName: 2, base.DefaultQueueName: 2,
"critical": 3,
"low": 1, "low": 1,
} }
starting := make(chan *base.TaskMessage) starting := make(chan *base.TaskMessage)
finished := make(chan *base.TaskMessage) finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest)
done := make(chan struct{}) done := make(chan struct{})
defer func() { close(done) }() defer func() { close(done) }()
go fakeHeartbeater(starting, finished, done) go fakeHeartbeater(starting, finished, done)
go fakeSyncer(syncCh, done)
p := newProcessor(processorParams{ p := newProcessor(processorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: syncCh,
cancelations: base.NewCancelations(), cancelations: base.NewCancelations(),
concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time. concurrency: 1, // Set concurrency to 1 to make sure tasks are processed one at a time.
queues: queueCfg, queues: queueCfg,
@ -447,20 +546,20 @@ func TestProcessorWithStrictPriority(t *testing.T) {
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
t.Log("Starting Processor")
p.start(&sync.WaitGroup{}) p.start(&sync.WaitGroup{})
time.Sleep(tc.wait) time.Sleep(tc.wait)
t.Log("Terminating Processor") // Make sure no tasks are stuck in in-progress list.
for _, qname := range tc.queues {
if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l)
}
}
p.terminate() p.terminate()
t.Log("Terminated Processor")
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
}
} }
} }