From f8a94fb8391246e71bb38e92a9a604e32469a496 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 17 Apr 2020 06:56:44 -0700 Subject: [PATCH] Define broker interface --- asynq.go | 24 ++++++++++++++++++++++++ heartbeat.go | 11 +++++------ processor.go | 26 +++++++++++++------------- processor_test.go | 8 ++++---- scheduler.go | 10 ++++------ server.go | 8 ++++---- subscriber.go | 9 ++++----- 7 files changed, 58 insertions(+), 38 deletions(-) diff --git a/asynq.go b/asynq.go index 7d1b211..90c6335 100644 --- a/asynq.go +++ b/asynq.go @@ -7,8 +7,10 @@ package asynq import ( "crypto/tls" "fmt" + "time" "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/base" ) // Task represents a unit of work to be performed. @@ -30,6 +32,28 @@ func NewTask(typename string, payload map[string]interface{}) *Task { } } +// broker is a message broker that supports operations to manage task queues. +// +// See rdb.RDB as a reference implementation. +type broker interface { + Enqueue(msg *base.TaskMessage) error + EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error + Dequeue(qnames ...string) (*base.TaskMessage, error) + Done(msg *base.TaskMessage) error + Requeue(msg *base.TaskMessage) error + Schedule(msg *base.TaskMessage, processAt time.Time) error + ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error + Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error + Kill(msg *base.TaskMessage, errMsg string) error + RequeueAll() (int64, error) + CheckAndEnqueue(qnames ...string) error + WriteServerState(ss *base.ServerState, ttl time.Duration) error + ClearServerState(ss *base.ServerState) error + CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers + PublishCancelation(id string) error + Close() error +} + // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // // RedisConnOpt represents a sum of following types: diff --git a/heartbeat.go b/heartbeat.go index 3164436..06e0c02 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -9,14 +9,13 @@ import ( "time" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/rdb" ) // heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { logger Logger - rdb *rdb.RDB + broker broker ss *base.ServerState @@ -27,10 +26,10 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(l Logger, rdb *rdb.RDB, ss *base.ServerState, interval time.Duration) *heartbeater { +func newHeartbeater(l Logger, b broker, ss *base.ServerState, interval time.Duration) *heartbeater { return &heartbeater{ logger: l, - rdb: rdb, + broker: b, ss: ss, done: make(chan struct{}), interval: interval, @@ -53,7 +52,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { for { select { case <-h.done: - h.rdb.ClearServerState(h.ss) + h.broker.ClearServerState(h.ss) h.logger.Info("Heartbeater done") return case <-time.After(h.interval): @@ -66,7 +65,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { func (h *heartbeater) beat() { // Note: Set TTL to be long enough so that it won't expire before we write again // and short enough to expire quickly once the process is shut down or killed. - err := h.rdb.WriteServerState(h.ss, h.interval*2) + err := h.broker.WriteServerState(h.ss, h.interval*2) if err != nil { h.logger.Error("could not write heartbeat data: %v", err) } diff --git a/processor.go b/processor.go index 0de9301..baac355 100644 --- a/processor.go +++ b/processor.go @@ -19,7 +19,7 @@ import ( type processor struct { logger Logger - rdb *rdb.RDB + broker broker ss *base.ServerState @@ -65,7 +65,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration type newProcessorParams struct { logger Logger - rdb *rdb.RDB + broker broker ss *base.ServerState retryDelayFunc retryDelayFunc syncCh chan<- *syncRequest @@ -84,7 +84,7 @@ func newProcessor(params newProcessorParams) *processor { } return &processor{ logger: params.logger, - rdb: params.rdb, + broker: params.broker, ss: params.ss, queueConfig: qcfg, orderedQueues: orderedQueues, @@ -157,8 +157,8 @@ func (p *processor) start(wg *sync.WaitGroup) { // process the task. func (p *processor) exec() { qnames := p.queues() - msg, err := p.rdb.Dequeue(qnames...) - if err == rdb.ErrNoProcessableTask { + msg, err := p.broker.Dequeue(qnames...) + if err == rdb.ErrNoProcessableTask { // TODO: Need to decouple this error from rdb to support other brokers // queues are empty, this is a normal behavior. if len(p.queueConfig) > 1 { // sleep to avoid slamming redis and let scheduler move tasks into queues. @@ -227,7 +227,7 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - n, err := p.rdb.RequeueAll() + n, err := p.broker.RequeueAll() if err != nil { p.logger.Error("Could not restore unfinished tasks: %v", err) } @@ -237,20 +237,20 @@ func (p *processor) restore() { } func (p *processor) requeue(msg *base.TaskMessage) { - err := p.rdb.Requeue(msg) + err := p.broker.Requeue(msg) if err != nil { p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err) } } func (p *processor) markAsDone(msg *base.TaskMessage) { - err := p.rdb.Done(msg) + err := p.broker.Done(msg) if err != nil { errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.rdb.Done(msg) + return p.broker.Done(msg) }, errMsg: errMsg, } @@ -260,13 +260,13 @@ func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) retry(msg *base.TaskMessage, e error) { d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) retryAt := time.Now().Add(d) - err := p.rdb.Retry(msg, retryAt, e.Error()) + err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.rdb.Retry(msg, retryAt, e.Error()) + return p.broker.Retry(msg, retryAt, e.Error()) }, errMsg: errMsg, } @@ -275,13 +275,13 @@ func (p *processor) retry(msg *base.TaskMessage, e error) { func (p *processor) kill(msg *base.TaskMessage, e error) { p.logger.Warn("Retry exhausted for task id=%s", msg.ID) - err := p.rdb.Kill(msg, e.Error()) + err := p.broker.Kill(msg, e.Error()) if err != nil { errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) p.logger.Warn("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.rdb.Kill(msg, e.Error()) + return p.broker.Kill(msg, e.Error()) }, errMsg: errMsg, } diff --git a/processor_test.go b/processor_test.go index af015e4..c379e8b 100644 --- a/processor_test.go +++ b/processor_test.go @@ -71,7 +71,7 @@ func TestProcessorSuccess(t *testing.T) { cancelations := base.NewCancelations() p := newProcessor(newProcessorParams{ logger: testLogger, - rdb: rdbClient, + broker: rdbClient, ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, @@ -178,7 +178,7 @@ func TestProcessorRetry(t *testing.T) { cancelations := base.NewCancelations() p := newProcessor(newProcessorParams{ logger: testLogger, - rdb: rdbClient, + broker: rdbClient, ss: ss, retryDelayFunc: delayFunc, syncCh: nil, @@ -253,7 +253,7 @@ func TestProcessorQueues(t *testing.T) { ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) p := newProcessor(newProcessorParams{ logger: testLogger, - rdb: nil, + broker: nil, ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, @@ -330,7 +330,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) p := newProcessor(newProcessorParams{ logger: testLogger, - rdb: rdbClient, + broker: rdbClient, ss: ss, retryDelayFunc: defaultDelayFunc, syncCh: nil, diff --git a/scheduler.go b/scheduler.go index 06a314e..6092d4a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -7,13 +7,11 @@ package asynq import ( "sync" "time" - - "github.com/hibiken/asynq/internal/rdb" ) type scheduler struct { logger Logger - rdb *rdb.RDB + broker broker // channel to communicate back to the long running "scheduler" goroutine. done chan struct{} @@ -25,14 +23,14 @@ type scheduler struct { qnames []string } -func newScheduler(l Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { +func newScheduler(l Logger, b broker, avgInterval time.Duration, qcfg map[string]int) *scheduler { var qnames []string for q := range qcfg { qnames = append(qnames, q) } return &scheduler{ logger: l, - rdb: r, + broker: b, done: make(chan struct{}), avgInterval: avgInterval, qnames: qnames, @@ -63,7 +61,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) { } func (s *scheduler) exec() { - if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { + if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil { s.logger.Error("Could not enqueue scheduled tasks: %v", err) } } diff --git a/server.go b/server.go index bdd9cc2..9237116 100644 --- a/server.go +++ b/server.go @@ -36,7 +36,7 @@ type Server struct { logger Logger - rdb *rdb.RDB + broker broker // wait group to wait for all goroutines to finish. wg sync.WaitGroup @@ -208,7 +208,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { subscriber := newSubscriber(logger, rdb, cancels) processor := newProcessor(newProcessorParams{ logger: logger, - rdb: rdb, + broker: rdb, ss: ss, retryDelayFunc: delayFunc, syncCh: syncCh, @@ -219,7 +219,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { return &Server{ ss: ss, logger: logger, - rdb: rdb, + broker: rdb, scheduler: scheduler, processor: processor, syncer: syncer, @@ -330,7 +330,7 @@ func (srv *Server) Stop() { srv.wg.Wait() - srv.rdb.Close() + srv.broker.Close() srv.ss.SetStatus(base.StatusStopped) srv.logger.Info("Bye!") diff --git a/subscriber.go b/subscriber.go index 4b47993..f6fa7f6 100644 --- a/subscriber.go +++ b/subscriber.go @@ -10,12 +10,11 @@ import ( "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/rdb" ) type subscriber struct { logger Logger - rdb *rdb.RDB + broker broker // channel to communicate back to the long running "subscriber" goroutine. done chan struct{} @@ -24,10 +23,10 @@ type subscriber struct { cancelations *base.Cancelations } -func newSubscriber(l Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { +func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber { return &subscriber{ logger: l, - rdb: rdb, + broker: b, done: make(chan struct{}), cancelations: cancelations, } @@ -49,7 +48,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) { ) // Try until successfully connect to Redis. for { - pubsub, err = s.rdb.CancelationPubSub() + pubsub, err = s.broker.CancelationPubSub() if err != nil { s.logger.Error("cannot subscribe to cancelation channel: %v", err) select {