diff --git a/asynq.go b/asynq.go index 90c6335..7d1b211 100644 --- a/asynq.go +++ b/asynq.go @@ -7,10 +7,8 @@ package asynq import ( "crypto/tls" "fmt" - "time" "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/base" ) // Task represents a unit of work to be performed. @@ -32,28 +30,6 @@ func NewTask(typename string, payload map[string]interface{}) *Task { } } -// broker is a message broker that supports operations to manage task queues. -// -// See rdb.RDB as a reference implementation. -type broker interface { - Enqueue(msg *base.TaskMessage) error - EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error - Dequeue(qnames ...string) (*base.TaskMessage, error) - Done(msg *base.TaskMessage) error - Requeue(msg *base.TaskMessage) error - Schedule(msg *base.TaskMessage, processAt time.Time) error - ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error - Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error - Kill(msg *base.TaskMessage, errMsg string) error - RequeueAll() (int64, error) - CheckAndEnqueue(qnames ...string) error - WriteServerState(ss *base.ServerState, ttl time.Duration) error - ClearServerState(ss *base.ServerState) error - CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers - PublishCancelation(id string) error - Close() error -} - // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // // RedisConnOpt represents a sum of following types: diff --git a/heartbeat.go b/heartbeat.go index 06e0c02..b89845c 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -15,7 +15,7 @@ import ( // indicate that the background worker process is up. type heartbeater struct { logger Logger - broker broker + broker base.Broker ss *base.ServerState @@ -26,7 +26,7 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l Logger, b broker, ss *base.ServerState, interval time.Duration) *heartbeater { +func newHeartbeater(l Logger, b base.Broker, ss *base.ServerState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, broker: b, diff --git a/internal/base/base.go b/internal/base/base.go index 460d96a..ca71232 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/v7" "github.com/rs/xid" ) @@ -328,3 +329,25 @@ func (c *Cancelations) GetAll() []context.CancelFunc { } return res } + +// Broker is a message broker that supports operations to manage task queues. +// +// See rdb.RDB as a reference implementation. +type Broker interface { + Enqueue(msg *TaskMessage) error + EnqueueUnique(msg *TaskMessage, ttl time.Duration) error + Dequeue(qnames ...string) (*TaskMessage, error) + Done(msg *TaskMessage) error + Requeue(msg *TaskMessage) error + Schedule(msg *TaskMessage, processAt time.Time) error + ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error + Retry(msg *TaskMessage, processAt time.Time, errMsg string) error + Kill(msg *TaskMessage, errMsg string) error + RequeueAll() (int64, error) + CheckAndEnqueue(qnames ...string) error + WriteServerState(ss *ServerState, ttl time.Duration) error + ClearServerState(ss *ServerState) error + CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers + PublishCancelation(id string) error + Close() error +} diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index d06cd6a..0acab70 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -8,9 +8,10 @@ package testbroker import ( "errors" "sync" + "time" "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/base" ) var errRedisDown = errors.New("asynqtest: redis is down") @@ -21,11 +22,12 @@ type TestBroker struct { mu sync.Mutex sleeping bool - *rdb.RDB + // real broker + real base.Broker } -func NewTestBroker(r *rdb.RDB) *TestBroker { - return &TestBroker{RDB: r} +func NewTestBroker(b base.Broker) *TestBroker { + return &TestBroker{real: b} } func (tb *TestBroker) Sleep() { @@ -40,11 +42,146 @@ func (tb *TestBroker) Wakeup() { tb.sleeping = false } +func (tb *TestBroker) Enqueue(msg *base.TaskMessage) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Enqueue(msg) +} + +func (tb *TestBroker) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.EnqueueUnique(msg, ttl) +} + +func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return nil, errRedisDown + } + return tb.real.Dequeue(qnames...) +} + +func (tb *TestBroker) Done(msg *base.TaskMessage) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Done(msg) +} + +func (tb *TestBroker) Requeue(msg *base.TaskMessage) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Requeue(msg) +} + +func (tb *TestBroker) Schedule(msg *base.TaskMessage, processAt time.Time) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Schedule(msg, processAt) +} + +func (tb *TestBroker) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.ScheduleUnique(msg, processAt, ttl) +} + +func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Retry(msg, processAt, errMsg) +} + +func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Kill(msg, errMsg) +} + +func (tb *TestBroker) RequeueAll() (int64, error) { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return 0, errRedisDown + } + return tb.real.RequeueAll() +} + +func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.CheckAndEnqueue() +} + +func (tb *TestBroker) WriteServerState(ss *base.ServerState, ttl time.Duration) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.WriteServerState(ss, ttl) +} + +func (tb *TestBroker) ClearServerState(ss *base.ServerState) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.ClearServerState(ss) +} + func (tb *TestBroker) CancelationPubSub() (*redis.PubSub, error) { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return nil, errRedisDown } - return tb.RDB.CancelationPubSub() + return tb.real.CancelationPubSub() +} + +func (tb *TestBroker) PublishCancelation(id string) error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.PublishCancelation(id) +} + +func (tb *TestBroker) Close() error { + tb.mu.Lock() + defer tb.mu.Unlock() + if tb.sleeping { + return errRedisDown + } + return tb.real.Close() } diff --git a/processor.go b/processor.go index baac355..adc7921 100644 --- a/processor.go +++ b/processor.go @@ -19,7 +19,7 @@ import ( type processor struct { logger Logger - broker broker + broker base.Broker ss *base.ServerState @@ -65,7 +65,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration type newProcessorParams struct { logger Logger - broker broker + broker base.Broker ss *base.ServerState retryDelayFunc retryDelayFunc syncCh chan<- *syncRequest diff --git a/scheduler.go b/scheduler.go index 6092d4a..8f4f860 100644 --- a/scheduler.go +++ b/scheduler.go @@ -7,11 +7,13 @@ package asynq import ( "sync" "time" + + "github.com/hibiken/asynq/internal/base" ) type scheduler struct { logger Logger - broker broker + broker base.Broker // channel to communicate back to the long running "scheduler" goroutine. done chan struct{} @@ -23,7 +25,7 @@ type scheduler struct { qnames []string } -func newScheduler(l Logger, b broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l Logger, b base.Broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) diff --git a/server.go b/server.go index 9237116..9a10008 100644 --- a/server.go +++ b/server.go @@ -36,7 +36,7 @@ type Server struct { logger Logger - broker broker + broker base.Broker // wait group to wait for all goroutines to finish. wg sync.WaitGroup diff --git a/subscriber.go b/subscriber.go index f21bae5..c72ac89 100644 --- a/subscriber.go +++ b/subscriber.go @@ -14,7 +14,7 @@ import ( type subscriber struct { logger Logger - broker broker + broker base.Broker // channel to communicate back to the long running "subscriber" goroutine. done chan struct{} @@ -26,7 +26,7 @@ type subscriber struct { retryTimeout time.Duration } -func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l Logger, b base.Broker, cancelations *base.Cancelations) *subscriber { return &subscriber{ logger: l, broker: b,