diff --git a/server_test.go b/server_test.go index 5cb88b1..31cccd9 100644 --- a/server_test.go +++ b/server_test.go @@ -6,10 +6,13 @@ package asynq import ( "context" + "fmt" "syscall" "testing" "time" + "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/testbroker" "go.uber.org/goleak" ) @@ -113,3 +116,95 @@ func TestServerErrServerRunning(t *testing.T) { } srv.Stop() } + +func TestServerWithRedisDown(t *testing.T) { + // Make sure that server does not panic and exit if redis is down. + defer func() { + if r := recover(); r != nil { + t.Errorf("panic occurred: %v", r) + } + }() + r := rdb.NewRDB(setup(t)) + testBroker := testbroker.NewTestBroker(r) + srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) + srv.broker = testBroker + srv.scheduler.broker = testBroker + srv.heartbeater.broker = testBroker + srv.processor.broker = testBroker + srv.subscriber.broker = testBroker + testBroker.Sleep() + + // no-op handler + h := func(ctx context.Context, task *Task) error { + return nil + } + + err := srv.Start(HandlerFunc(h)) + if err != nil { + t.Fatal(err) + } + + time.Sleep(3 * time.Second) + + srv.Stop() +} + +func TestServerWithFlakyBroker(t *testing.T) { + // Make sure that server does not panic and exit if redis is down. + defer func() { + if r := recover(); r != nil { + t.Errorf("panic occurred: %v", r) + } + }() + r := rdb.NewRDB(setup(t)) + testBroker := testbroker.NewTestBroker(r) + srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{}) + srv.broker = testBroker + srv.scheduler.broker = testBroker + srv.heartbeater.broker = testBroker + srv.processor.broker = testBroker + srv.subscriber.broker = testBroker + + c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB}) + + h := func(ctx context.Context, task *Task) error { + // force task retry. + if task.Type == "bad_task" { + return fmt.Errorf("could not process %q", task.Type) + } + time.Sleep(2 * time.Second) + return nil + } + + err := srv.Start(HandlerFunc(h)) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i)) + if err != nil { + t.Fatal(err) + } + err = c.Enqueue(NewTask("bad_task", nil)) + if err != nil { + t.Fatal(err) + } + err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil)) + if err != nil { + t.Fatal(err) + } + } + + // simulate redis going down. + testBroker.Sleep() + + time.Sleep(3 * time.Second) + + // simulate redis comes back online. + testBroker.Wakeup() + + time.Sleep(3 * time.Second) + + srv.Stop() +}