2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00
asynq/server_test.go

265 lines
6.3 KiB
Go
Raw Normal View History

2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"context"
"fmt"
2024-05-01 10:34:16 +08:00
"os"
"runtime"
2020-04-27 22:19:14 +08:00
"syscall"
"testing"
"time"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
2022-03-19 22:16:55 +08:00
"github.com/hibiken/asynq/internal/testutil"
"go.uber.org/goleak"
)
2020-04-12 23:16:42 +08:00
func TestServer(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
redisConnOpt := getRedisConnOpt(t)
c := NewClient(redisConnOpt)
2020-09-08 21:51:01 +08:00
defer c.Close()
srv := NewServer(redisConnOpt, Config{
Concurrency: 10,
LogLevel: testLogLevel,
})
// no-op handler
h := func(ctx context.Context, task *Task) error {
return nil
}
err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
2022-03-19 22:16:55 +08:00
_, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 123})))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}
2022-03-19 22:16:55 +08:00
_, err = c.Enqueue(NewTask("send_email", testutil.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}
srv.Shutdown()
}
2020-04-27 22:19:14 +08:00
func TestServerRun(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/redis/go-redis/v9/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNone(t, ignoreOpt)
2024-05-01 10:34:16 +08:00
signalWindows := make(chan struct{})
2020-04-27 22:19:14 +08:00
done := make(chan struct{})
go func() {
2024-05-01 10:34:16 +08:00
defer close(done)
mux := NewServeMux()
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
if err := srv.Start(mux); err != nil {
t.Error("start server:", err)
}
if runtime.GOOS == "windows" {
<-signalWindows
} else {
srv.waitForSignals()
}
srv.Shutdown()
2020-04-27 22:19:14 +08:00
}()
2024-05-01 10:34:16 +08:00
time.Sleep(1 * time.Second)
2020-04-27 22:19:14 +08:00
2024-05-01 10:34:16 +08:00
// Make sure server exits when receiving TERM signal.
2020-04-27 22:19:14 +08:00
go func() {
2024-05-01 10:34:16 +08:00
if runtime.GOOS == "windows" {
// It is not implemented to signal Interrupt on Windows
signalWindows <- struct{}{}
return
}
p, err := os.FindProcess(os.Getpid())
if err != nil {
t.Error("find process:", err)
return
}
err = p.Signal(syscall.SIGTERM)
if err != nil {
t.Error("signal:", err)
return
2020-04-27 22:19:14 +08:00
}
}()
2024-05-01 10:34:16 +08:00
select {
case <-time.After(10 * time.Second):
t.Error("stop server: server did not stop after receiving TERM signal")
case <-done:
2020-04-27 22:19:14 +08:00
}
}
func TestServerErrServerClosed(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
2020-04-15 00:01:22 +08:00
handler := NewServeMux()
if err := srv.Start(handler); err != nil {
t.Fatal(err)
}
srv.Shutdown()
2020-04-15 00:01:22 +08:00
err := srv.Start(handler)
if err != ErrServerClosed {
t.Errorf("Restarting server: (*Server).Start(handler) = %v, want ErrServerClosed error", err)
2020-04-15 00:01:22 +08:00
}
}
func TestServerErrNilHandler(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
2020-04-15 00:01:22 +08:00
err := srv.Start(nil)
if err == nil {
t.Error("Starting server with nil handler: (*Server).Start(nil) did not return error")
srv.Shutdown()
2020-04-15 00:01:22 +08:00
}
}
func TestServerErrServerRunning(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
2020-04-15 00:01:22 +08:00
handler := NewServeMux()
if err := srv.Start(handler); err != nil {
t.Fatal(err)
}
err := srv.Start(handler)
if err == nil {
t.Error("Calling (*Server).Start(handler) on already running server did not return error")
}
srv.Shutdown()
2020-04-15 00:01:22 +08:00
}
func TestServerWithRedisDown(t *testing.T) {
// Make sure that server does not panic and exit if redis is down.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
srv.broker = testBroker
srv.forwarder.broker = testBroker
srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker
srv.subscriber.broker = testBroker
testBroker.Sleep()
// no-op handler
h := func(ctx context.Context, task *Task) error {
return nil
}
err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
srv.Shutdown()
}
func TestServerWithFlakyBroker(t *testing.T) {
// Make sure that server does not panic and exit if redis is down.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
redisConnOpt := getRedisConnOpt(t)
srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel})
srv.broker = testBroker
srv.forwarder.broker = testBroker
srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker
srv.subscriber.broker = testBroker
c := NewClient(redisConnOpt)
h := func(ctx context.Context, task *Task) error {
// force task retry.
2021-03-21 04:42:13 +08:00
if task.Type() == "bad_task" {
return fmt.Errorf("could not process %q", task.Type())
}
time.Sleep(2 * time.Second)
return nil
}
err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
_, err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i))
if err != nil {
t.Fatal(err)
}
_, err = c.Enqueue(NewTask("bad_task", nil))
if err != nil {
t.Fatal(err)
}
_, err = c.Enqueue(NewTask("scheduled", nil), ProcessIn(time.Duration(i)*time.Second))
if err != nil {
t.Fatal(err)
}
}
// simulate redis going down.
testBroker.Sleep()
time.Sleep(3 * time.Second)
// simulate redis comes back online.
testBroker.Wakeup()
time.Sleep(3 * time.Second)
srv.Shutdown()
}
func TestLogLevel(t *testing.T) {
tests := []struct {
flagVal string
want LogLevel
wantStr string
}{
{"debug", DebugLevel, "debug"},
{"Info", InfoLevel, "info"},
{"WARN", WarnLevel, "warn"},
{"warning", WarnLevel, "warn"},
{"Error", ErrorLevel, "error"},
{"fatal", FatalLevel, "fatal"},
}
for _, tc := range tests {
level := new(LogLevel)
if err := level.Set(tc.flagVal); err != nil {
t.Fatal(err)
}
if *level != tc.want {
t.Errorf("Set(%q): got %v, want %v", tc.flagVal, level, &tc.want)
continue
}
if got := level.String(); got != tc.wantStr {
t.Errorf("String() returned %q, want %q", got, tc.wantStr)
}
}
}