mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add tests to simulate cases where server cannot talk to redis
This commit is contained in:
parent
5161b9368a
commit
1c1474c55c
@ -6,10 +6,13 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/hibiken/asynq/internal/testbroker"
|
||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -113,3 +116,95 @@ func TestServerErrServerRunning(t *testing.T) {
|
|||||||
}
|
}
|
||||||
srv.Stop()
|
srv.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServerWithRedisDown(t *testing.T) {
|
||||||
|
// Make sure that server does not panic and exit if redis is down.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Errorf("panic occurred: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
r := rdb.NewRDB(setup(t))
|
||||||
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
|
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{})
|
||||||
|
srv.broker = testBroker
|
||||||
|
srv.scheduler.broker = testBroker
|
||||||
|
srv.heartbeater.broker = testBroker
|
||||||
|
srv.processor.broker = testBroker
|
||||||
|
srv.subscriber.broker = testBroker
|
||||||
|
testBroker.Sleep()
|
||||||
|
|
||||||
|
// no-op handler
|
||||||
|
h := func(ctx context.Context, task *Task) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := srv.Start(HandlerFunc(h))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
srv.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServerWithFlakyBroker(t *testing.T) {
|
||||||
|
// Make sure that server does not panic and exit if redis is down.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
t.Errorf("panic occurred: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
r := rdb.NewRDB(setup(t))
|
||||||
|
testBroker := testbroker.NewTestBroker(r)
|
||||||
|
srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{})
|
||||||
|
srv.broker = testBroker
|
||||||
|
srv.scheduler.broker = testBroker
|
||||||
|
srv.heartbeater.broker = testBroker
|
||||||
|
srv.processor.broker = testBroker
|
||||||
|
srv.subscriber.broker = testBroker
|
||||||
|
|
||||||
|
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
|
||||||
|
|
||||||
|
h := func(ctx context.Context, task *Task) error {
|
||||||
|
// force task retry.
|
||||||
|
if task.Type == "bad_task" {
|
||||||
|
return fmt.Errorf("could not process %q", task.Type)
|
||||||
|
}
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err := srv.Start(HandlerFunc(h))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = c.Enqueue(NewTask("bad_task", nil))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// simulate redis going down.
|
||||||
|
testBroker.Sleep()
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
// simulate redis comes back online.
|
||||||
|
testBroker.Wakeup()
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
srv.Stop()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user