mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 23:02:18 +08:00
Fix benchmark tests
This commit is contained in:
parent
c29200b1fc
commit
207a6d2d1a
@ -16,11 +16,17 @@ import (
|
|||||||
// This file defines test helper functions used by
|
// This file defines test helper functions used by
|
||||||
// other test files.
|
// other test files.
|
||||||
|
|
||||||
|
// redis used for package testing.
|
||||||
|
const (
|
||||||
|
redisAddr = "localhost:6379"
|
||||||
|
redisDB = 14
|
||||||
|
)
|
||||||
|
|
||||||
func setup(tb testing.TB) *redis.Client {
|
func setup(tb testing.TB) *redis.Client {
|
||||||
tb.Helper()
|
tb.Helper()
|
||||||
r := redis.NewClient(&redis.Options{
|
r := redis.NewClient(&redis.Options{
|
||||||
Addr: "localhost:6379",
|
Addr: redisAddr,
|
||||||
DB: 14,
|
DB: redisDB,
|
||||||
})
|
})
|
||||||
// Start each test with a clean slate.
|
// Start each test with a clean slate.
|
||||||
h.FlushDB(tb, r)
|
h.FlushDB(tb, r)
|
||||||
|
@ -111,9 +111,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
|
|||||||
qcfg := normalizeQueueCfg(queues)
|
qcfg := normalizeQueueCfg(queues)
|
||||||
|
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
|
|
||||||
syncer := newSyncer(syncRequestCh, 5*time.Second)
|
syncer := newSyncer(syncRequestCh, 5*time.Second)
|
||||||
|
|
||||||
rdb := rdb.NewRDB(createRedisClient(r))
|
rdb := rdb.NewRDB(createRedisClient(r))
|
||||||
scheduler := newScheduler(rdb, 5*time.Second, qcfg)
|
scheduler := newScheduler(rdb, 5*time.Second, qcfg)
|
||||||
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
|
processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh)
|
||||||
@ -196,6 +194,8 @@ func (bg *Background) stop() {
|
|||||||
|
|
||||||
bg.scheduler.terminate()
|
bg.scheduler.terminate()
|
||||||
bg.processor.terminate()
|
bg.processor.terminate()
|
||||||
|
// Note: processor and all worker goroutines need to be exited
|
||||||
|
// before shutting down syncer to avoid goroutine leak.
|
||||||
bg.syncer.terminate()
|
bg.syncer.terminate()
|
||||||
|
|
||||||
bg.rdb.Close()
|
bg.rdb.Close()
|
||||||
|
@ -18,9 +18,13 @@ func BenchmarkEndToEndSimple(b *testing.B) {
|
|||||||
const count = 100000
|
const count = 100000
|
||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
b.StopTimer() // begin setup
|
b.StopTimer() // begin setup
|
||||||
r := setup(b)
|
setup(b)
|
||||||
client := NewClient(r)
|
redis := &RedisClientOpt{
|
||||||
bg := NewBackground(r, &Config{
|
Addr: redisAddr,
|
||||||
|
DB: redisDB,
|
||||||
|
}
|
||||||
|
client := NewClient(redis)
|
||||||
|
bg := NewBackground(redis, &Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
||||||
return time.Second
|
return time.Second
|
||||||
@ -55,9 +59,13 @@ func BenchmarkEndToEnd(b *testing.B) {
|
|||||||
for n := 0; n < b.N; n++ {
|
for n := 0; n < b.N; n++ {
|
||||||
b.StopTimer() // begin setup
|
b.StopTimer() // begin setup
|
||||||
rand.Seed(time.Now().UnixNano())
|
rand.Seed(time.Now().UnixNano())
|
||||||
r := setup(b)
|
setup(b)
|
||||||
client := NewClient(r)
|
redis := &RedisClientOpt{
|
||||||
bg := NewBackground(r, &Config{
|
Addr: redisAddr,
|
||||||
|
DB: redisDB,
|
||||||
|
}
|
||||||
|
client := NewClient(redis)
|
||||||
|
bg := NewBackground(redis, &Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
|
||||||
return time.Second
|
return time.Second
|
||||||
|
@ -30,6 +30,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
|
|||||||
return &syncer{
|
return &syncer{
|
||||||
requestsCh: requestsCh,
|
requestsCh: requestsCh,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
|
interval: interval,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func TestSyncer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(interval) // ensure that syncer runs at least once
|
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||||
|
|
||||||
gotInProgress := h.GetInProgressMessages(t, r)
|
gotInProgress := h.GetInProgressMessages(t, r)
|
||||||
if l := len(gotInProgress); l != 0 {
|
if l := len(gotInProgress); l != 0 {
|
||||||
@ -78,7 +78,7 @@ func TestSyncerRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(interval) // ensure that syncer runs at least once
|
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||||
|
|
||||||
// Sanity check to ensure that message was not successfully deleted
|
// Sanity check to ensure that message was not successfully deleted
|
||||||
// from in-progress list.
|
// from in-progress list.
|
||||||
@ -90,7 +90,7 @@ func TestSyncerRetry(t *testing.T) {
|
|||||||
// simualate failover.
|
// simualate failover.
|
||||||
rdbClient = rdb.NewRDB(goodClient)
|
rdbClient = rdb.NewRDB(goodClient)
|
||||||
|
|
||||||
time.Sleep(interval) // ensure that syncer runs at least once
|
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||||
|
|
||||||
gotInProgress = h.GetInProgressMessages(t, goodClient)
|
gotInProgress = h.GetInProgressMessages(t, goodClient)
|
||||||
if l := len(gotInProgress); l != 0 {
|
if l := len(gotInProgress); l != 0 {
|
||||||
|
Loading…
Reference in New Issue
Block a user