2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Move Broker interface to base package

This commit is contained in:
Ken Hibino 2020-04-18 07:55:10 -07:00
parent 46ab4417dd
commit 7c7f8e5f30
8 changed files with 176 additions and 38 deletions

View File

@ -7,10 +7,8 @@ package asynq
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
) )
// Task represents a unit of work to be performed. // Task represents a unit of work to be performed.
@ -32,28 +30,6 @@ func NewTask(typename string, payload map[string]interface{}) *Task {
} }
} }
// broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
type broker interface {
Enqueue(msg *base.TaskMessage) error
EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*base.TaskMessage, error)
Done(msg *base.TaskMessage) error
Requeue(msg *base.TaskMessage) error
Schedule(msg *base.TaskMessage, processAt time.Time) error
ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *base.TaskMessage, errMsg string) error
RequeueAll() (int64, error)
CheckAndEnqueue(qnames ...string) error
WriteServerState(ss *base.ServerState, ttl time.Duration) error
ClearServerState(ss *base.ServerState) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
Close() error
}
// RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
// //
// RedisConnOpt represents a sum of following types: // RedisConnOpt represents a sum of following types:

View File

@ -15,7 +15,7 @@ import (
// indicate that the background worker process is up. // indicate that the background worker process is up.
type heartbeater struct { type heartbeater struct {
logger Logger logger Logger
broker broker broker base.Broker
ss *base.ServerState ss *base.ServerState
@ -26,7 +26,7 @@ type heartbeater struct {
interval time.Duration interval time.Duration
} }
func newHeartbeater(l Logger, b broker, ss *base.ServerState, interval time.Duration) *heartbeater { func newHeartbeater(l Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater {
return &heartbeater{ return &heartbeater{
logger: l, logger: l,
broker: b, broker: b,

View File

@ -12,6 +12,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/rs/xid" "github.com/rs/xid"
) )
@ -328,3 +329,25 @@ func (c *Cancelations) GetAll() []context.CancelFunc {
} }
return res return res
} }
// Broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
type Broker interface {
Enqueue(msg *TaskMessage) error
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, error)
Done(msg *TaskMessage) error
Requeue(msg *TaskMessage) error
Schedule(msg *TaskMessage, processAt time.Time) error
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *TaskMessage, errMsg string) error
RequeueAll() (int64, error)
CheckAndEnqueue(qnames ...string) error
WriteServerState(ss *ServerState, ttl time.Duration) error
ClearServerState(ss *ServerState) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
Close() error
}

View File

@ -8,9 +8,10 @@ package testbroker
import ( import (
"errors" "errors"
"sync" "sync"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/base"
) )
var errRedisDown = errors.New("asynqtest: redis is down") var errRedisDown = errors.New("asynqtest: redis is down")
@ -21,11 +22,12 @@ type TestBroker struct {
mu sync.Mutex mu sync.Mutex
sleeping bool sleeping bool
*rdb.RDB // real broker
real base.Broker
} }
func NewTestBroker(r *rdb.RDB) *TestBroker { func NewTestBroker(b base.Broker) *TestBroker {
return &TestBroker{RDB: r} return &TestBroker{real: b}
} }
func (tb *TestBroker) Sleep() { func (tb *TestBroker) Sleep() {
@ -40,11 +42,146 @@ func (tb *TestBroker) Wakeup() {
tb.sleeping = false tb.sleeping = false
} }
func (tb *TestBroker) Enqueue(msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Enqueue(msg)
}
func (tb *TestBroker) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.EnqueueUnique(msg, ttl)
}
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
}
return tb.real.Dequeue(qnames...)
}
func (tb *TestBroker) Done(msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Done(msg)
}
func (tb *TestBroker) Requeue(msg *base.TaskMessage) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Requeue(msg)
}
func (tb *TestBroker) Schedule(msg *base.TaskMessage, processAt time.Time) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Schedule(msg, processAt)
}
func (tb *TestBroker) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.ScheduleUnique(msg, processAt, ttl)
}
func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Retry(msg, processAt, errMsg)
}
func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Kill(msg, errMsg)
}
func (tb *TestBroker) RequeueAll() (int64, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return 0, errRedisDown
}
return tb.real.RequeueAll()
}
func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.CheckAndEnqueue()
}
func (tb *TestBroker) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.WriteServerState(ss, ttl)
}
func (tb *TestBroker) ClearServerState(ss *base.ServerState) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.ClearServerState(ss)
}
func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return nil, errRedisDown return nil, errRedisDown
} }
return tb.RDB.CancelationPubSub() return tb.real.CancelationPubSub()
}
func (tb *TestBroker) PublishCancelation(id string) error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.PublishCancelation(id)
}
func (tb *TestBroker) Close() error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Close()
} }

View File

@ -19,7 +19,7 @@ import (
type processor struct { type processor struct {
logger Logger logger Logger
broker broker broker base.Broker
ss *base.ServerState ss *base.ServerState
@ -65,7 +65,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration
type newProcessorParams struct { type newProcessorParams struct {
logger Logger logger Logger
broker broker broker base.Broker
ss *base.ServerState ss *base.ServerState
retryDelayFunc retryDelayFunc retryDelayFunc retryDelayFunc
syncCh chan<- *syncRequest syncCh chan<- *syncRequest

View File

@ -7,11 +7,13 @@ package asynq
import ( import (
"sync" "sync"
"time" "time"
"github.com/hibiken/asynq/internal/base"
) )
type scheduler struct { type scheduler struct {
logger Logger logger Logger
broker broker broker base.Broker
// channel to communicate back to the long running "scheduler" goroutine. // channel to communicate back to the long running "scheduler" goroutine.
done chan struct{} done chan struct{}
@ -23,7 +25,7 @@ type scheduler struct {
qnames []string qnames []string
} }
func newScheduler(l Logger, b broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { func newScheduler(l Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string var qnames []string
for q := range qcfg { for q := range qcfg {
qnames = append(qnames, q) qnames = append(qnames, q)

View File

@ -36,7 +36,7 @@ type Server struct {
logger Logger logger Logger
broker broker broker base.Broker
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup

View File

@ -14,7 +14,7 @@ import (
type subscriber struct { type subscriber struct {
logger Logger logger Logger
broker broker broker base.Broker
// channel to communicate back to the long running "subscriber" goroutine. // channel to communicate back to the long running "subscriber" goroutine.
done chan struct{} done chan struct{}
@ -26,7 +26,7 @@ type subscriber struct {
retryTimeout time.Duration retryTimeout time.Duration
} }
func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { func newSubscriber(l Logger, b base.Broker, cancelations *base.Cancelations) *subscriber {
return &subscriber{ return &subscriber{
logger: l, logger: l,
broker: b, broker: b,