2020-04-17 22:52:12 +08:00
|
|
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT license
|
|
|
|
// that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
// Package testbroker exports a broker implementation that should be used in package testing.
|
|
|
|
package testbroker
|
|
|
|
|
|
|
|
import (
|
2021-11-16 08:34:26 +08:00
|
|
|
"context"
|
2020-04-17 22:52:12 +08:00
|
|
|
"errors"
|
|
|
|
"sync"
|
2020-04-18 22:55:10 +08:00
|
|
|
"time"
|
2020-04-17 22:52:12 +08:00
|
|
|
|
2023-01-11 22:41:38 +08:00
|
|
|
"github.com/redis/go-redis/v9"
|
2020-04-18 22:55:10 +08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2020-04-17 22:52:12 +08:00
|
|
|
)
|
|
|
|
|
2022-03-19 22:16:55 +08:00
|
|
|
var errRedisDown = errors.New("testutil: redis is down")
|
2020-04-17 22:52:12 +08:00
|
|
|
|
|
|
|
// TestBroker is a broker implementation which enables
|
|
|
|
// to simulate Redis failure in tests.
|
|
|
|
type TestBroker struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
sleeping bool
|
|
|
|
|
2020-04-18 22:55:10 +08:00
|
|
|
// real broker
|
|
|
|
real base.Broker
|
2020-04-17 22:52:12 +08:00
|
|
|
}
|
|
|
|
|
2020-06-23 21:34:59 +08:00
|
|
|
// Make sure TestBroker implements Broker interface at compile time.
|
|
|
|
var _ base.Broker = (*TestBroker)(nil)
|
|
|
|
|
2020-04-18 22:55:10 +08:00
|
|
|
func NewTestBroker(b base.Broker) *TestBroker {
|
|
|
|
return &TestBroker{real: b}
|
2020-04-17 22:52:12 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) Sleep() {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
tb.sleeping = true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) Wakeup() {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
tb.sleeping = false
|
|
|
|
}
|
|
|
|
|
2021-11-16 08:34:26 +08:00
|
|
|
func (tb *TestBroker) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2021-11-16 08:34:26 +08:00
|
|
|
return tb.real.Enqueue(ctx, msg)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2021-11-16 08:34:26 +08:00
|
|
|
func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time.Duration) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2021-11-16 08:34:26 +08:00
|
|
|
return tb.real.EnqueueUnique(ctx, msg, ttl)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
2022-02-14 23:17:51 +08:00
|
|
|
return nil, time.Time{}, errRedisDown
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
return tb.real.Dequeue(qnames...)
|
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) Done(ctx context.Context, msg *base.TaskMessage) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2022-02-14 23:17:51 +08:00
|
|
|
return tb.real.Done(ctx, msg)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
|
2021-11-06 07:52:54 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2022-02-14 23:17:51 +08:00
|
|
|
return tb.real.MarkAsComplete(ctx, msg)
|
2021-11-06 07:52:54 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) Requeue(ctx context.Context, msg *base.TaskMessage) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2022-02-14 23:17:51 +08:00
|
|
|
return tb.real.Requeue(ctx, msg)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2021-11-16 08:34:26 +08:00
|
|
|
func (tb *TestBroker) Schedule(ctx context.Context, msg *base.TaskMessage, processAt time.Time) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2021-11-16 08:34:26 +08:00
|
|
|
return tb.real.Schedule(ctx, msg, processAt)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2021-11-16 08:34:26 +08:00
|
|
|
func (tb *TestBroker) ScheduleUnique(ctx context.Context, msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2021-11-16 08:34:26 +08:00
|
|
|
return tb.real.ScheduleUnique(ctx, msg, processAt, ttl)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2022-02-14 23:17:51 +08:00
|
|
|
return tb.real.Retry(ctx, msg, processAt, errMsg, isFailure)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2022-02-14 23:17:51 +08:00
|
|
|
return tb.real.Archive(ctx, msg, errMsg)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2021-03-13 08:23:08 +08:00
|
|
|
func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2021-03-13 08:23:08 +08:00
|
|
|
return tb.real.ForwardIfReady(qnames...)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2021-11-06 07:52:54 +08:00
|
|
|
func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.DeleteExpiredCompletedTasks(qname)
|
|
|
|
}
|
|
|
|
|
2022-02-11 22:18:27 +08:00
|
|
|
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
2020-06-21 22:05:57 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return nil, errRedisDown
|
|
|
|
}
|
2022-02-11 22:18:27 +08:00
|
|
|
return tb.real.ListLeaseExpired(cutoff, qnames...)
|
2020-06-21 22:05:57 +08:00
|
|
|
}
|
|
|
|
|
2022-02-14 23:17:51 +08:00
|
|
|
func (tb *TestBroker) ExtendLease(qname string, ids ...string) (time.Time, error) {
|
2022-02-13 01:48:07 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
2022-02-14 23:17:51 +08:00
|
|
|
return time.Time{}, errRedisDown
|
2022-02-13 01:48:07 +08:00
|
|
|
}
|
|
|
|
return tb.real.ExtendLease(qname, ids...)
|
|
|
|
}
|
|
|
|
|
2020-05-19 11:47:35 +08:00
|
|
|
func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2020-05-19 11:47:35 +08:00
|
|
|
return tb.real.WriteServerState(info, workers, ttl)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2020-05-19 11:47:35 +08:00
|
|
|
func (tb *TestBroker) ClearServerState(host string, pid int, serverID string) error {
|
2020-04-18 22:55:10 +08:00
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
2020-05-19 11:47:35 +08:00
|
|
|
return tb.real.ClearServerState(host, pid, serverID)
|
2020-04-18 22:55:10 +08:00
|
|
|
}
|
|
|
|
|
2020-04-17 22:52:12 +08:00
|
|
|
func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return nil, errRedisDown
|
|
|
|
}
|
2020-04-18 22:55:10 +08:00
|
|
|
return tb.real.CancelationPubSub()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) PublishCancelation(id string) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.PublishCancelation(id)
|
|
|
|
}
|
|
|
|
|
2021-11-06 07:52:54 +08:00
|
|
|
func (tb *TestBroker) WriteResult(qname, id string, data []byte) (int, error) {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return 0, errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.WriteResult(qname, id, data)
|
|
|
|
}
|
|
|
|
|
2020-07-26 09:49:27 +08:00
|
|
|
func (tb *TestBroker) Ping() error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.Ping()
|
|
|
|
}
|
|
|
|
|
2020-04-18 22:55:10 +08:00
|
|
|
func (tb *TestBroker) Close() error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.Close()
|
2020-04-17 22:52:12 +08:00
|
|
|
}
|
2022-03-11 07:59:58 +08:00
|
|
|
|
|
|
|
func (tb *TestBroker) AddToGroup(ctx context.Context, msg *base.TaskMessage, gname string) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.AddToGroup(ctx, msg, gname)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) AddToGroupUnique(ctx context.Context, msg *base.TaskMessage, gname string, ttl time.Duration) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.AddToGroupUnique(ctx, msg, gname, ttl)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) ListGroups(qname string) ([]string, error) {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return nil, errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.ListGroups(qname)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error) {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return "", errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.AggregationCheck(qname, gname, t, gracePeriod, maxDelay, maxSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) ReadAggregationSet(qname, gname, aggregationSetID string) ([]*base.TaskMessage, time.Time, error) {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return nil, time.Time{}, errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.ReadAggregationSet(qname, gname, aggregationSetID)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (tb *TestBroker) DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.DeleteAggregationSet(ctx, qname, gname, aggregationSetID)
|
|
|
|
}
|
2022-03-12 02:44:12 +08:00
|
|
|
|
|
|
|
func (tb *TestBroker) ReclaimStaleAggregationSets(qname string) error {
|
|
|
|
tb.mu.Lock()
|
|
|
|
defer tb.mu.Unlock()
|
|
|
|
if tb.sleeping {
|
|
|
|
return errRedisDown
|
|
|
|
}
|
|
|
|
return tb.real.ReclaimStaleAggregationSets(qname)
|
|
|
|
}
|