diff --git a/syncer_test.go b/syncer_test.go index 24793df..37ce92b 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -5,11 +5,11 @@ package asynq import ( + "fmt" "sync" "testing" "time" - "github.com/go-redis/redis/v7" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" @@ -50,54 +50,43 @@ func TestSyncer(t *testing.T) { } func TestSyncerRetry(t *testing.T) { - inProgress := []*base.TaskMessage{ - h.NewTaskMessage("send_email", nil), - h.NewTaskMessage("reindex", nil), - h.NewTaskMessage("gen_thumbnail", nil), - } - goodClient := setup(t) - h.SeedInProgressQueue(t, goodClient, inProgress) - - // Simulate the situation where redis server is down - // by connecting to a wrong port. - badClient := redis.NewClient(&redis.Options{ - Addr: "localhost:6390", - }) - rdbClient := rdb.NewRDB(badClient) - const interval = time.Second syncRequestCh := make(chan *syncRequest) syncer := newSyncer(syncRequestCh, interval) + var wg sync.WaitGroup syncer.start(&wg) defer syncer.terminate() - for _, msg := range inProgress { - m := msg - syncRequestCh <- &syncRequest{ - fn: func() error { - return rdbClient.Done(m) - }, + var ( + mu sync.Mutex + counter int + ) + + // Increment the counter for each call. + // Initial call will fail and second call will succeed. + requestFunc := func() error { + mu.Lock() + defer mu.Unlock() + if counter == 0 { + counter++ + return fmt.Errorf("zero") } + counter++ + return nil } - time.Sleep(2 * interval) // ensure that syncer runs at least once - - // Sanity check to ensure that message was not successfully deleted - // from in-progress list. - gotInProgress := h.GetInProgressMessages(t, goodClient) - if l := len(gotInProgress); l != len(inProgress) { - t.Errorf("%q has length %d; want %d", base.InProgressQueue, l, len(inProgress)) + syncRequestCh <- &syncRequest{ + fn: requestFunc, + errMsg: "error", } - // FIXME: This assignment introduces data race and running the test with -race will fail. - // simualate failover. - rdbClient = rdb.NewRDB(goodClient) + // allow syncer to retry + time.Sleep(3 * interval) - time.Sleep(2 * interval) // ensure that syncer runs at least once - - gotInProgress = h.GetInProgressMessages(t, goodClient) - if l := len(gotInProgress); l != 0 { - t.Errorf("%q has length %d; want 0", base.InProgressQueue, l) + mu.Lock() + if counter != 2 { + t.Errorf("counter = %d, want 2", counter) } + mu.Unlock() }