2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update ProcessorRetry test

This commit is contained in:
Ken Hibino 2021-06-03 06:33:21 -07:00
parent b0321fb465
commit f9d7af3def

View File

@ -44,6 +44,7 @@ func fakeSyncer(syncCh <-chan *syncRequest, done <-chan struct{}) {
func TestProcessorSuccessWithSingleQueue(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("task1", nil)
@ -146,6 +147,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
t3 = NewTask(m3.Type, m3.Payload)
t4 = NewTask(m4.Type, m4.Payload)
)
defer r.Close()
tests := []struct {
pending map[string][]*base.TaskMessage
@ -226,6 +228,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
// https://github.com/hibiken/asynq/issues/166
func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111}))
@ -302,6 +305,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
func TestProcessorRetry(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("send_email", nil)
@ -312,66 +316,55 @@ func TestProcessorRetry(t *testing.T) {
errMsg := "something went wrong"
wrappedSkipRetry := fmt.Errorf("%s:%w", errMsg, SkipRetry)
now := time.Now()
tests := []struct {
desc string // test description
pending []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run
delay time.Duration // retry delay duration
handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []base.Z // tasks in retry queue at the end
wantErrMsg string // error message the task should record
wantRetry []*base.TaskMessage // tasks in retry queue at the end
wantArchived []*base.TaskMessage // tasks in archived queue at the end
wantErrCount int // number of times error handler should be called
}{
{
desc: "Should automatically retry errored tasks",
pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{m3, m4},
pending: []*base.TaskMessage{m1, m2, m3, m4},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return fmt.Errorf(errMsg)
}),
wait: 2 * time.Second,
wantRetry: []base.Z{
{Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()},
{Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()},
{Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()},
},
wantArchived: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
wantErrMsg: errMsg,
wantRetry: []*base.TaskMessage{m2, m3, m4},
wantArchived: []*base.TaskMessage{m1},
wantErrCount: 4,
},
{
desc: "Should skip retry errored tasks",
pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return SkipRetry // return SkipRetry without wrapping
}),
wait: 2 * time.Second,
wantRetry: []base.Z{},
wantArchived: []*base.TaskMessage{
h.TaskMessageWithError(*m1, SkipRetry.Error()),
h.TaskMessageWithError(*m2, SkipRetry.Error()),
},
wantErrMsg: SkipRetry.Error(),
wantRetry: []*base.TaskMessage{},
wantArchived: []*base.TaskMessage{m1, m2},
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
},
{
desc: "Should skip retry errored tasks (with error wrapping)",
pending: []*base.TaskMessage{m1, m2},
incoming: []*base.TaskMessage{},
delay: time.Minute,
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
return wrappedSkipRetry
}),
wait: 2 * time.Second,
wantRetry: []base.Z{},
wantArchived: []*base.TaskMessage{
h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()),
h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()),
},
wantErrMsg: wrappedSkipRetry.Error(),
wantRetry: []*base.TaskMessage{},
wantArchived: []*base.TaskMessage{m1, m2},
wantErrCount: 2, // ErrorHandler should still be called with SkipRetry error
},
}
@ -415,24 +408,34 @@ func TestProcessorRetry(t *testing.T) {
p.handler = tc.handler
p.start(&sync.WaitGroup{})
for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg)
if err != nil {
p.shutdown()
t.Fatal(err)
}
}
now := time.Now() // time when processor is running
time.Sleep(tc.wait) // FIXME: This makes test flaky.
p.shutdown()
cmpOpt := h.EquateInt64Approx(1) // allow up to a second difference in zset score
cmpOpt := h.EquateInt64Approx(int64(tc.wait.Seconds())) // allow up to a wait-second difference in zset score
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
var wantRetry []base.Z // Note: construct wantRetry here since `LastFailedAt` and ZSCORE is relative to each test run.
for _, msg := range tc.wantRetry {
wantRetry = append(wantRetry,
base.Z{
Message: h.TaskMessageAfterRetry(*msg, tc.wantErrMsg),
Score: now.Add(tc.delay).Unix(),
})
}
if diff := cmp.Diff(wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff)
}
gotDead := h.GetArchivedMessages(t, r, base.DefaultQueueName)
if diff := cmp.Diff(tc.wantArchived, gotDead, h.SortMsgOpt); diff != "" {
gotArchived := h.GetArchivedEntries(t, r, base.DefaultQueueName)
var wantArchived []base.Z // Note: construct wantArchived here since `LastFailedAt` and ZSCORE is relative to each test run.
for _, msg := range tc.wantArchived {
wantArchived = append(wantArchived,
base.Z{
Message: h.TaskMessageWithError(*msg, tc.wantErrMsg),
Score: now.Unix(),
})
}
if diff := cmp.Diff(wantArchived, gotArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.ArchivedKey(base.DefaultQueueName), diff)
}